Skip to content

Commit

Permalink
feat: API to list executions and filter by entity and date range with…
Browse files Browse the repository at this point in the history
… pagination, ordering
  • Loading branch information
chandrasekharan-zipstack committed Jan 29, 2025
1 parent 10f48e0 commit 667e6d5
Show file tree
Hide file tree
Showing 15 changed files with 148 additions and 11 deletions.
1 change: 1 addition & 0 deletions backend/backend/settings/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,7 @@ def get_required_setting(
"prompt_studio.prompt_studio_document_manager_v2",
"prompt_studio.prompt_studio_index_manager_v2",
"tags",
"execution",
)
TENANT_APPS = []

Expand Down
1 change: 1 addition & 0 deletions backend/backend/urls_v2.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,4 +59,5 @@
include("prompt_studio.prompt_studio_index_manager_v2.urls"),
),
path("tags/", include("tags.urls")),
path("execution/", include("execution.urls")),
]
Empty file added backend/execution/__init__.py
Empty file.
6 changes: 6 additions & 0 deletions backend/execution/apps.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
from django.apps import AppConfig


class ExecutionConfig(AppConfig):
default_auto_field = "django.db.models.BigAutoField"
name = "execution"
Empty file.
38 changes: 38 additions & 0 deletions backend/execution/serializer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
from api_v2.models import APIDeployment
from pipeline_v2.models import Pipeline
from rest_framework import serializers
from workflow_manager.workflow_v2.models import Workflow, WorkflowExecution


class ExecutionSerializer(serializers.ModelSerializer):
workflow_name = serializers.SerializerMethodField()
pipeline_name = serializers.SerializerMethodField()

class Meta:
model = WorkflowExecution
fields = "__all__"

def get_workflow_name(self, obj):
"""Fetch the workflow name using workflow_id"""
# TODO: Update after making Workflow a foreign key
# return obj.workflow.workflow_name if obj.workflow_id else None
if workflow := Workflow.objects.filter(id=obj.workflow_id).first():
return workflow.workflow_name
return None

def get_pipeline_name(self, obj):
"""Fetch the pipeline or API deployment name"""
if not obj.pipeline_id:
return None

# Check if pipeline_id exists in Pipeline model
pipeline = Pipeline.objects.filter(id=obj.pipeline_id).first()
if pipeline:
return pipeline.pipeline_name

# If not found in Pipeline, check APIDeployment model
api_deployment = APIDeployment.objects.filter(id=obj.pipeline_id).first()
if api_deployment:
return api_deployment.display_name

return None
17 changes: 17 additions & 0 deletions backend/execution/urls.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
from django.urls import path
from execution.views import ExecutionViewSet
from rest_framework.urlpatterns import format_suffix_patterns

execution_list = ExecutionViewSet.as_view(
{
"get": "list",
}
)
execution_detail = ExecutionViewSet.as_view({"get": "retrieve", "delete": "destroy"})

urlpatterns = format_suffix_patterns(
[
path("", execution_list, name="execution-list"),
path("<uuid:pk>/", execution_detail, name="execution-detail"),
]
)
69 changes: 69 additions & 0 deletions backend/execution/views.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
import logging
from typing import Optional

from api_v2.models import APIDeployment
from django.db.models import Q, QuerySet
from execution.serializer import ExecutionSerializer
from permissions.permission import IsOrganizationMember
from pipeline_v2.models import Pipeline
from rest_framework import viewsets
from rest_framework.filters import OrderingFilter
from rest_framework.permissions import IsAuthenticated
from utils.date import DateRangeKeys, DateRangeSerializer
from utils.pagination import CustomPagination
from workflow_manager.workflow_v2.models import Workflow, WorkflowExecution

logger = logging.getLogger(__name__)


class ExecutionViewSet(viewsets.ModelViewSet):
permission_classes = [IsAuthenticated, IsOrganizationMember]
serializer_class = ExecutionSerializer
pagination_class = CustomPagination
filter_backends = [OrderingFilter]
ordering_fields = ["created_at"]
ordering = ["-created_at"]

def get_queryset(self) -> Optional[QuerySet]:
execution_entity = self.request.query_params.get("execution_entity")

queryset = WorkflowExecution.objects.all()

# Filter based on execution entity
if execution_entity == "API":
queryset = queryset.filter(
pipeline_id__in=APIDeployment.objects.values_list("id", flat=True)
)
elif execution_entity == "ETL":
queryset = queryset.filter(
pipeline_id__in=Pipeline.objects.filter(
pipeline_type=Pipeline.PipelineType.ETL
).values_list("id", flat=True)
)
elif execution_entity == "TASK":
queryset = queryset.filter(
pipeline_id__in=Pipeline.objects.filter(
pipeline_type=Pipeline.PipelineType.TASK
).values_list("id", flat=True)
)
elif execution_entity == "WF":
queryset = queryset.filter(
pipeline_id=None,
workflow_id__in=Workflow.objects.values_list("id", flat=True),
)

# Parse and apply date filters
date_range_serializer = DateRangeSerializer(data=self.request.query_params)
date_range_serializer.is_valid(raise_exception=True)

filters = Q()
if start_date := date_range_serializer.validated_data.get(
DateRangeKeys.START_DATE
):
filters &= Q(created_at__gte=start_date)
if end_date := date_range_serializer.validated_data.get(DateRangeKeys.END_DATE):
filters &= Q(created_at__lte=end_date)

queryset = queryset.filter(filters)

return queryset
8 changes: 3 additions & 5 deletions backend/pipeline_v2/execution_view.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from permissions.permission import IsOwner
from pipeline_v2.serializers.execute import DateRangeSerializer
from rest_framework import viewsets
from rest_framework.versioning import URLPathVersioning
from utils.date import DateRangeKeys, DateRangeSerializer
from utils.pagination import CustomPagination
from workflow_manager.workflow_v2.models.execution import WorkflowExecution
from workflow_manager.workflow_v2.serializers import WorkflowExecutionSerializer
Expand All @@ -14,8 +14,6 @@ class PipelineExecutionViewSet(viewsets.ModelViewSet):
pagination_class = CustomPagination

CREATED_AT_FIELD_DESC = "-created_at"
START_DATE_FIELD = "start_date"
END_DATE_FIELD = "end_date"

def get_queryset(self):
# Get the pipeline_id from the URL path
Expand All @@ -25,8 +23,8 @@ def get_queryset(self):
# Validate start_date and end_date parameters using DateRangeSerializer
date_range_serializer = DateRangeSerializer(data=self.request.query_params)
date_range_serializer.is_valid(raise_exception=True)
start_date = date_range_serializer.validated_data.get(self.START_DATE_FIELD)
end_date = date_range_serializer.validated_data.get(self.END_DATE_FIELD)
start_date = date_range_serializer.validated_data.get(DateRangeKeys.START_DATE)
end_date = date_range_serializer.validated_data.get(DateRangeKeys.END_DATE)

if start_date and end_date:
queryset = queryset.filter(created_at__range=(start_date, end_date))
Expand Down
5 changes: 0 additions & 5 deletions backend/pipeline_v2/serializers/execute.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,3 @@ def validate_pipeline_id(self, value: str) -> str:
except Pipeline.DoesNotExist:
raise serializers.ValidationError("Invalid pipeline ID")
return value


class DateRangeSerializer(serializers.Serializer):
start_date = serializers.DateTimeField(required=False)
end_date = serializers.DateTimeField(required=False)
1 change: 0 additions & 1 deletion backend/usage_v2/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,6 @@ def filter_date_range(cls, value: str) -> Optional[DateRange]:
if not preset:
return None
start_date, end_date = preset.get_date_range()
print(start_date, end_date)
return cls._validate_date_range(start_date=start_date, end_date=end_date)

@classmethod
Expand Down
2 changes: 2 additions & 0 deletions backend/utils/date/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
from .constants import DateRangeKeys # noqa: F401
from .serializer import DateRangeSerializer # noqa: F401
3 changes: 3 additions & 0 deletions backend/utils/date/constants.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
class DateRangeKeys:
START_DATE = "start_date"
END_DATE = "end_date"
6 changes: 6 additions & 0 deletions backend/utils/date/serializer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
from rest_framework import serializers


class DateRangeSerializer(serializers.Serializer):
start_date = serializers.DateTimeField(required=False)
end_date = serializers.DateTimeField(required=False)
2 changes: 2 additions & 0 deletions backend/workflow_manager/workflow_v2/workflow_helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -368,6 +368,8 @@ def run_workflow(
)
raise
finally:
# TODO: Handle error gracefully during delete
# Mark status as an ERROR correctly
destination.delete_execution_directory()

@staticmethod
Expand Down

0 comments on commit 667e6d5

Please sign in to comment.