diff --git a/backend/backend/settings/base.py b/backend/backend/settings/base.py index 104ffc9f6..4aed37834 100644 --- a/backend/backend/settings/base.py +++ b/backend/backend/settings/base.py @@ -250,6 +250,7 @@ def get_required_setting( "prompt_studio.prompt_studio_document_manager_v2", "prompt_studio.prompt_studio_index_manager_v2", "tags", + "execution", ) TENANT_APPS = [] diff --git a/backend/backend/urls_v2.py b/backend/backend/urls_v2.py index 0a8bd0f95..213b9a7bf 100644 --- a/backend/backend/urls_v2.py +++ b/backend/backend/urls_v2.py @@ -59,4 +59,5 @@ include("prompt_studio.prompt_studio_index_manager_v2.urls"), ), path("tags/", include("tags.urls")), + path("execution/", include("execution.urls")), ] diff --git a/backend/execution/__init__.py b/backend/execution/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/backend/execution/apps.py b/backend/execution/apps.py new file mode 100644 index 000000000..696ae5ea0 --- /dev/null +++ b/backend/execution/apps.py @@ -0,0 +1,6 @@ +from django.apps import AppConfig + + +class ExecutionConfig(AppConfig): + default_auto_field = "django.db.models.BigAutoField" + name = "execution" diff --git a/backend/execution/migrations/__init__.py b/backend/execution/migrations/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/backend/execution/serializer.py b/backend/execution/serializer.py new file mode 100644 index 000000000..b17f40fa1 --- /dev/null +++ b/backend/execution/serializer.py @@ -0,0 +1,38 @@ +from api_v2.models import APIDeployment +from pipeline_v2.models import Pipeline +from rest_framework import serializers +from workflow_manager.workflow_v2.models import Workflow, WorkflowExecution + + +class ExecutionSerializer(serializers.ModelSerializer): + workflow_name = serializers.SerializerMethodField() + pipeline_name = serializers.SerializerMethodField() + + class Meta: + model = WorkflowExecution + fields = "__all__" + + def get_workflow_name(self, obj): + """Fetch the workflow name using workflow_id""" + # TODO: Update after making Workflow a foreign key + # return obj.workflow.workflow_name if obj.workflow_id else None + if workflow := Workflow.objects.filter(id=obj.workflow_id).first(): + return workflow.workflow_name + return None + + def get_pipeline_name(self, obj): + """Fetch the pipeline or API deployment name""" + if not obj.pipeline_id: + return None + + # Check if pipeline_id exists in Pipeline model + pipeline = Pipeline.objects.filter(id=obj.pipeline_id).first() + if pipeline: + return pipeline.pipeline_name + + # If not found in Pipeline, check APIDeployment model + api_deployment = APIDeployment.objects.filter(id=obj.pipeline_id).first() + if api_deployment: + return api_deployment.display_name + + return None diff --git a/backend/execution/urls.py b/backend/execution/urls.py new file mode 100644 index 000000000..672cb32a7 --- /dev/null +++ b/backend/execution/urls.py @@ -0,0 +1,17 @@ +from django.urls import path +from execution.views import ExecutionViewSet +from rest_framework.urlpatterns import format_suffix_patterns + +execution_list = ExecutionViewSet.as_view( + { + "get": "list", + } +) +execution_detail = ExecutionViewSet.as_view({"get": "retrieve", "delete": "destroy"}) + +urlpatterns = format_suffix_patterns( + [ + path("", execution_list, name="execution-list"), + path("/", execution_detail, name="execution-detail"), + ] +) diff --git a/backend/execution/views.py b/backend/execution/views.py new file mode 100644 index 000000000..0b877d718 --- /dev/null +++ b/backend/execution/views.py @@ -0,0 +1,69 @@ +import logging +from typing import Optional + +from api_v2.models import APIDeployment +from django.db.models import Q, QuerySet +from execution.serializer import ExecutionSerializer +from permissions.permission import IsOrganizationMember +from pipeline_v2.models import Pipeline +from rest_framework import viewsets +from rest_framework.filters import OrderingFilter +from rest_framework.permissions import IsAuthenticated +from utils.date import DateRangeKeys, DateRangeSerializer +from utils.pagination import CustomPagination +from workflow_manager.workflow_v2.models import Workflow, WorkflowExecution + +logger = logging.getLogger(__name__) + + +class ExecutionViewSet(viewsets.ModelViewSet): + permission_classes = [IsAuthenticated, IsOrganizationMember] + serializer_class = ExecutionSerializer + pagination_class = CustomPagination + filter_backends = [OrderingFilter] + ordering_fields = ["created_at"] + ordering = ["-created_at"] + + def get_queryset(self) -> Optional[QuerySet]: + execution_entity = self.request.query_params.get("execution_entity") + + queryset = WorkflowExecution.objects.all() + + # Filter based on execution entity + if execution_entity == "API": + queryset = queryset.filter( + pipeline_id__in=APIDeployment.objects.values_list("id", flat=True) + ) + elif execution_entity == "ETL": + queryset = queryset.filter( + pipeline_id__in=Pipeline.objects.filter( + pipeline_type=Pipeline.PipelineType.ETL + ).values_list("id", flat=True) + ) + elif execution_entity == "TASK": + queryset = queryset.filter( + pipeline_id__in=Pipeline.objects.filter( + pipeline_type=Pipeline.PipelineType.TASK + ).values_list("id", flat=True) + ) + elif execution_entity == "WF": + queryset = queryset.filter( + pipeline_id=None, + workflow_id__in=Workflow.objects.values_list("id", flat=True), + ) + + # Parse and apply date filters + date_range_serializer = DateRangeSerializer(data=self.request.query_params) + date_range_serializer.is_valid(raise_exception=True) + + filters = Q() + if start_date := date_range_serializer.validated_data.get( + DateRangeKeys.START_DATE + ): + filters &= Q(created_at__gte=start_date) + if end_date := date_range_serializer.validated_data.get(DateRangeKeys.END_DATE): + filters &= Q(created_at__lte=end_date) + + queryset = queryset.filter(filters) + + return queryset diff --git a/backend/pipeline_v2/execution_view.py b/backend/pipeline_v2/execution_view.py index fa514615a..56e8e06e5 100644 --- a/backend/pipeline_v2/execution_view.py +++ b/backend/pipeline_v2/execution_view.py @@ -1,7 +1,7 @@ from permissions.permission import IsOwner -from pipeline_v2.serializers.execute import DateRangeSerializer from rest_framework import viewsets from rest_framework.versioning import URLPathVersioning +from utils.date import DateRangeKeys, DateRangeSerializer from utils.pagination import CustomPagination from workflow_manager.workflow_v2.models.execution import WorkflowExecution from workflow_manager.workflow_v2.serializers import WorkflowExecutionSerializer @@ -14,8 +14,6 @@ class PipelineExecutionViewSet(viewsets.ModelViewSet): pagination_class = CustomPagination CREATED_AT_FIELD_DESC = "-created_at" - START_DATE_FIELD = "start_date" - END_DATE_FIELD = "end_date" def get_queryset(self): # Get the pipeline_id from the URL path @@ -25,8 +23,8 @@ def get_queryset(self): # Validate start_date and end_date parameters using DateRangeSerializer date_range_serializer = DateRangeSerializer(data=self.request.query_params) date_range_serializer.is_valid(raise_exception=True) - start_date = date_range_serializer.validated_data.get(self.START_DATE_FIELD) - end_date = date_range_serializer.validated_data.get(self.END_DATE_FIELD) + start_date = date_range_serializer.validated_data.get(DateRangeKeys.START_DATE) + end_date = date_range_serializer.validated_data.get(DateRangeKeys.END_DATE) if start_date and end_date: queryset = queryset.filter(created_at__range=(start_date, end_date)) diff --git a/backend/pipeline_v2/serializers/execute.py b/backend/pipeline_v2/serializers/execute.py index 4e5a7fd58..0f0457a37 100644 --- a/backend/pipeline_v2/serializers/execute.py +++ b/backend/pipeline_v2/serializers/execute.py @@ -17,8 +17,3 @@ def validate_pipeline_id(self, value: str) -> str: except Pipeline.DoesNotExist: raise serializers.ValidationError("Invalid pipeline ID") return value - - -class DateRangeSerializer(serializers.Serializer): - start_date = serializers.DateTimeField(required=False) - end_date = serializers.DateTimeField(required=False) diff --git a/backend/usage_v2/utils.py b/backend/usage_v2/utils.py index 7bdd6ce6f..0ad0d3443 100644 --- a/backend/usage_v2/utils.py +++ b/backend/usage_v2/utils.py @@ -116,7 +116,6 @@ def filter_date_range(cls, value: str) -> Optional[DateRange]: if not preset: return None start_date, end_date = preset.get_date_range() - print(start_date, end_date) return cls._validate_date_range(start_date=start_date, end_date=end_date) @classmethod diff --git a/backend/utils/date/__init__.py b/backend/utils/date/__init__.py new file mode 100644 index 000000000..faf94e125 --- /dev/null +++ b/backend/utils/date/__init__.py @@ -0,0 +1,2 @@ +from .constants import DateRangeKeys # noqa: F401 +from .serializer import DateRangeSerializer # noqa: F401 diff --git a/backend/utils/date/constants.py b/backend/utils/date/constants.py new file mode 100644 index 000000000..1d5a3bcdf --- /dev/null +++ b/backend/utils/date/constants.py @@ -0,0 +1,3 @@ +class DateRangeKeys: + START_DATE = "start_date" + END_DATE = "end_date" diff --git a/backend/utils/date/serializer.py b/backend/utils/date/serializer.py new file mode 100644 index 000000000..054d700bb --- /dev/null +++ b/backend/utils/date/serializer.py @@ -0,0 +1,6 @@ +from rest_framework import serializers + + +class DateRangeSerializer(serializers.Serializer): + start_date = serializers.DateTimeField(required=False) + end_date = serializers.DateTimeField(required=False) diff --git a/backend/workflow_manager/workflow_v2/workflow_helper.py b/backend/workflow_manager/workflow_v2/workflow_helper.py index 4478ad359..4111291aa 100644 --- a/backend/workflow_manager/workflow_v2/workflow_helper.py +++ b/backend/workflow_manager/workflow_v2/workflow_helper.py @@ -368,6 +368,8 @@ def run_workflow( ) raise finally: + # TODO: Handle error gracefully during delete + # Mark status as an ERROR correctly destination.delete_execution_directory() @staticmethod