Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: API to list executions and filter by entity type #1103

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions backend/backend/settings/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,7 @@ def get_required_setting(
"prompt_studio.prompt_studio_document_manager_v2",
"prompt_studio.prompt_studio_index_manager_v2",
"tags",
"execution",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need a new execution application instead of using our existing workflow_manager/workflow? Is there a specific reason for separating it from the existing setup?

Is this not part of workflow_execution?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The URL prefix for all the apps in workflow_manager is /workflow. I wanted to use a different URL execution for these new endpoints and considered adding a new app altogether. Let me see if I can move around these files within the existing apps itself however these APIs need to support WF / pipeline runs and prompt studio logs as well

)
TENANT_APPS = []

Expand Down
1 change: 1 addition & 0 deletions backend/backend/urls_v2.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,4 +59,5 @@
include("prompt_studio.prompt_studio_index_manager_v2.urls"),
),
path("tags/", include("tags.urls")),
path("execution/", include("execution.urls")),
]
Empty file added backend/execution/__init__.py
Empty file.
6 changes: 6 additions & 0 deletions backend/execution/apps.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
from django.apps import AppConfig


class ExecutionConfig(AppConfig):
default_auto_field = "django.db.models.BigAutoField"
name = "execution"
Empty file.
38 changes: 38 additions & 0 deletions backend/execution/serializer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
from api_v2.models import APIDeployment
from pipeline_v2.models import Pipeline
from rest_framework import serializers
from workflow_manager.workflow_v2.models import Workflow, WorkflowExecution


class ExecutionSerializer(serializers.ModelSerializer):
workflow_name = serializers.SerializerMethodField()
pipeline_name = serializers.SerializerMethodField()

class Meta:
model = WorkflowExecution
fields = "__all__"

def get_workflow_name(self, obj):
"""Fetch the workflow name using workflow_id"""
# TODO: Update after making Workflow a foreign key
# return obj.workflow.workflow_name if obj.workflow_id else None
if workflow := Workflow.objects.filter(id=obj.workflow_id).first():
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we use .get instaed of filter and first?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think, It would be beteter if we add it in Workflow moodel as a property or method

return workflow.workflow_name
return None

def get_pipeline_name(self, obj):
"""Fetch the pipeline or API deployment name"""
if not obj.pipeline_id:
return None

# Check if pipeline_id exists in Pipeline model
pipeline = Pipeline.objects.filter(id=obj.pipeline_id).first()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same feedback explaned for workflow

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

May be Some helper function also avaialble in pipline or APIdeployment

if pipeline:
return pipeline.pipeline_name

# If not found in Pipeline, check APIDeployment model
api_deployment = APIDeployment.objects.filter(id=obj.pipeline_id).first()
if api_deployment:
return api_deployment.display_name

return None
17 changes: 17 additions & 0 deletions backend/execution/urls.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
from django.urls import path
from execution.views import ExecutionViewSet
from rest_framework.urlpatterns import format_suffix_patterns

execution_list = ExecutionViewSet.as_view(
{
"get": "list",
}
)
execution_detail = ExecutionViewSet.as_view({"get": "retrieve", "delete": "destroy"})

urlpatterns = format_suffix_patterns(
[
path("", execution_list, name="execution-list"),
path("<uuid:pk>/", execution_detail, name="execution-detail"),
]
)
69 changes: 69 additions & 0 deletions backend/execution/views.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
import logging
from typing import Optional

from api_v2.models import APIDeployment
from django.db.models import Q, QuerySet
from execution.serializer import ExecutionSerializer
from permissions.permission import IsOrganizationMember
from pipeline_v2.models import Pipeline
from rest_framework import viewsets
from rest_framework.filters import OrderingFilter
from rest_framework.permissions import IsAuthenticated
from utils.date import DateRangeKeys, DateRangeSerializer
from utils.pagination import CustomPagination
from workflow_manager.workflow_v2.models import Workflow, WorkflowExecution

logger = logging.getLogger(__name__)


class ExecutionViewSet(viewsets.ModelViewSet):
permission_classes = [IsAuthenticated, IsOrganizationMember]
serializer_class = ExecutionSerializer
pagination_class = CustomPagination
filter_backends = [OrderingFilter]
ordering_fields = ["created_at"]
ordering = ["-created_at"]

def get_queryset(self) -> Optional[QuerySet]:
execution_entity = self.request.query_params.get("execution_entity")

queryset = WorkflowExecution.objects.all()

# Filter based on execution entity
if execution_entity == "API":
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you use this as enum?

queryset = queryset.filter(
pipeline_id__in=APIDeployment.objects.values_list("id", flat=True)
)
elif execution_entity == "ETL":
queryset = queryset.filter(
pipeline_id__in=Pipeline.objects.filter(
pipeline_type=Pipeline.PipelineType.ETL
).values_list("id", flat=True)
)
elif execution_entity == "TASK":
queryset = queryset.filter(
pipeline_id__in=Pipeline.objects.filter(
pipeline_type=Pipeline.PipelineType.TASK
).values_list("id", flat=True)
)
elif execution_entity == "WF":
queryset = queryset.filter(
pipeline_id=None,
workflow_id__in=Workflow.objects.values_list("id", flat=True),
)

# Parse and apply date filters
date_range_serializer = DateRangeSerializer(data=self.request.query_params)
date_range_serializer.is_valid(raise_exception=True)
Comment on lines +55 to +57
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NIT: If we use this one as seriealizer, Add this as a base cass in the main serializer if possible. Another possibility , can we use DateTimeProcessor and filter class here?


filters = Q()
if start_date := date_range_serializer.validated_data.get(
DateRangeKeys.START_DATE
):
filters &= Q(created_at__gte=start_date)
if end_date := date_range_serializer.validated_data.get(DateRangeKeys.END_DATE):
filters &= Q(created_at__lte=end_date)

queryset = queryset.filter(filters)

return queryset
8 changes: 3 additions & 5 deletions backend/pipeline_v2/execution_view.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from permissions.permission import IsOwner
from pipeline_v2.serializers.execute import DateRangeSerializer
from rest_framework import viewsets
from rest_framework.versioning import URLPathVersioning
from utils.date import DateRangeKeys, DateRangeSerializer
from utils.pagination import CustomPagination
from workflow_manager.workflow_v2.models.execution import WorkflowExecution
from workflow_manager.workflow_v2.serializers import WorkflowExecutionSerializer
Expand All @@ -14,8 +14,6 @@ class PipelineExecutionViewSet(viewsets.ModelViewSet):
pagination_class = CustomPagination

CREATED_AT_FIELD_DESC = "-created_at"
START_DATE_FIELD = "start_date"
END_DATE_FIELD = "end_date"

def get_queryset(self):
# Get the pipeline_id from the URL path
Expand All @@ -25,8 +23,8 @@ def get_queryset(self):
# Validate start_date and end_date parameters using DateRangeSerializer
date_range_serializer = DateRangeSerializer(data=self.request.query_params)
date_range_serializer.is_valid(raise_exception=True)
start_date = date_range_serializer.validated_data.get(self.START_DATE_FIELD)
end_date = date_range_serializer.validated_data.get(self.END_DATE_FIELD)
start_date = date_range_serializer.validated_data.get(DateRangeKeys.START_DATE)
end_date = date_range_serializer.validated_data.get(DateRangeKeys.END_DATE)

if start_date and end_date:
queryset = queryset.filter(created_at__range=(start_date, end_date))
Expand Down
5 changes: 0 additions & 5 deletions backend/pipeline_v2/serializers/execute.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,3 @@ def validate_pipeline_id(self, value: str) -> str:
except Pipeline.DoesNotExist:
raise serializers.ValidationError("Invalid pipeline ID")
return value


class DateRangeSerializer(serializers.Serializer):
start_date = serializers.DateTimeField(required=False)
end_date = serializers.DateTimeField(required=False)
1 change: 0 additions & 1 deletion backend/usage_v2/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,6 @@ def filter_date_range(cls, value: str) -> Optional[DateRange]:
if not preset:
return None
start_date, end_date = preset.get_date_range()
print(start_date, end_date)
return cls._validate_date_range(start_date=start_date, end_date=end_date)

@classmethod
Expand Down
2 changes: 2 additions & 0 deletions backend/utils/date/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
from .constants import DateRangeKeys # noqa: F401
from .serializer import DateRangeSerializer # noqa: F401
3 changes: 3 additions & 0 deletions backend/utils/date/constants.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
class DateRangeKeys:
START_DATE = "start_date"
END_DATE = "end_date"
6 changes: 6 additions & 0 deletions backend/utils/date/serializer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
from rest_framework import serializers


class DateRangeSerializer(serializers.Serializer):
start_date = serializers.DateTimeField(required=False)
end_date = serializers.DateTimeField(required=False)
2 changes: 2 additions & 0 deletions backend/workflow_manager/workflow_v2/workflow_helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -368,6 +368,8 @@ def run_workflow(
)
raise
finally:
# TODO: Handle error gracefully during delete
# Mark status as an ERROR correctly
destination.delete_execution_directory()

@staticmethod
Expand Down