Skip to content

Commit

Permalink
Moved execution app into workflow_manager, added API to list file exe…
Browse files Browse the repository at this point in the history
…cutions for an execution with latest logs, schema migration for indexes on some columns
  • Loading branch information
chandrasekharan-zipstack committed Feb 4, 2025
1 parent 0ffca6b commit 71e8dd1
Show file tree
Hide file tree
Showing 19 changed files with 199 additions and 68 deletions.
2 changes: 1 addition & 1 deletion backend/backend/settings/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,7 @@ def get_required_setting(
"workflow_manager.file_execution",
"workflow_manager.endpoint_v2",
"workflow_manager.workflow_v2",
"workflow_manager.execution",
"tool_instance_v2",
"pipeline_v2",
"platform_settings_v2",
Expand All @@ -250,7 +251,6 @@ def get_required_setting(
"prompt_studio.prompt_studio_document_manager_v2",
"prompt_studio.prompt_studio_index_manager_v2",
"tags",
"execution",
)
TENANT_APPS = []

Expand Down
2 changes: 1 addition & 1 deletion backend/backend/urls_v2.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,5 +59,5 @@
include("prompt_studio.prompt_studio_index_manager_v2.urls"),
),
path("tags/", include("tags.urls")),
path("execution/", include("execution.urls")),
path("execution/", include("workflow_manager.execution.urls")),
]
Empty file.
38 changes: 0 additions & 38 deletions backend/execution/serializer.py

This file was deleted.

17 changes: 0 additions & 17 deletions backend/execution/urls.py

This file was deleted.

File renamed without changes.
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,4 @@

class ExecutionConfig(AppConfig):
default_auto_field = "django.db.models.BigAutoField"
name = "execution"
name = "workflow_manager.execution"
8 changes: 8 additions & 0 deletions backend/workflow_manager/execution/enum.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
from enum import Enum


class ExecutionEntity(Enum):
ETL = "ETL"
API = "API"
TASK = "TASK"
WORKFLOW = "WF"
2 changes: 2 additions & 0 deletions backend/workflow_manager/execution/serializer/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
from .execution import ExecutionSerializer # noqa: F401
from .file_centric import FileCentricExecutionSerializer # noqa: F401
22 changes: 22 additions & 0 deletions backend/workflow_manager/execution/serializer/execution.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
from typing import Optional

from rest_framework import serializers
from workflow_manager.workflow_v2.models import WorkflowExecution


# TODO: Optimize with select_related / prefetch_related to reduce DB queries
class ExecutionSerializer(serializers.ModelSerializer):
workflow_name = serializers.SerializerMethodField()
pipeline_name = serializers.SerializerMethodField()

class Meta:
model = WorkflowExecution
exclude = ["task_id", "execution_log_id", "execution_type"]

def get_workflow_name(self, obj: WorkflowExecution) -> Optional[str]:
"""Fetch the workflow name using workflow_id"""
return obj.workflow_name

def get_pipeline_name(self, obj: WorkflowExecution) -> Optional[str]:
"""Fetch the pipeline or API deployment name"""
return obj.pipeline_name
22 changes: 22 additions & 0 deletions backend/workflow_manager/execution/serializer/file_centric.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
from typing import Optional

from rest_framework import serializers
from workflow_manager.file_execution.models import (
WorkflowFileExecution as FileExecution,
)


class FileCentricExecutionSerializer(serializers.ModelSerializer):
latest_log = serializers.SerializerMethodField()

class Meta:
model = FileExecution
exclude = ["file_hash"]

def get_latest_log(self, obj: FileExecution) -> Optional[dict[str, any]]:
latest_log = (
obj.execution_logs.exclude(data__log_level__in=["DEBUG", "WARN"])
.order_by("-event_time")
.first()
)
return latest_log.data if latest_log else None
25 changes: 25 additions & 0 deletions backend/workflow_manager/execution/urls.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
from django.urls import path
from rest_framework.urlpatterns import format_suffix_patterns
from workflow_manager.execution.views import (
ExecutionViewSet,
FileCentricExecutionViewSet,
)
from workflow_manager.workflow_v2.execution_log_view import (
WorkflowExecutionLogViewSet as ExecutionLogViewSet,
)

execution_list = ExecutionViewSet.as_view(
{
"get": "list",
}
)
file_centric_list = FileCentricExecutionViewSet.as_view({"get": "list"})
execution_log_list = ExecutionLogViewSet.as_view({"get": "list"})

urlpatterns = format_suffix_patterns(
[
path("", execution_list, name="execution-list"),
path("<uuid:pk>/files/", file_centric_list, name="file-centric-execution-list"),
path("<uuid:pk>/logs/", execution_log_list, name="execution-log"),
]
)
2 changes: 2 additions & 0 deletions backend/workflow_manager/execution/views/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
from .execution import ExecutionViewSet # noqa: F401
from .file_centric import FileCentricExecutionViewSet # noqa: F401
Original file line number Diff line number Diff line change
Expand Up @@ -3,21 +3,21 @@

from api_v2.models import APIDeployment
from django.db.models import Q, QuerySet
from execution.serializer import ExecutionSerializer
from permissions.permission import IsOrganizationMember
from pipeline_v2.models import Pipeline
from rest_framework import viewsets
from rest_framework.filters import OrderingFilter
from rest_framework.permissions import IsAuthenticated
from utils.date import DateRangeKeys, DateRangeSerializer
from utils.pagination import CustomPagination
from workflow_manager.execution.enum import ExecutionEntity
from workflow_manager.execution.serializer import ExecutionSerializer
from workflow_manager.workflow_v2.models import Workflow, WorkflowExecution

logger = logging.getLogger(__name__)


class ExecutionViewSet(viewsets.ModelViewSet):
permission_classes = [IsAuthenticated, IsOrganizationMember]
class ExecutionViewSet(viewsets.ReadOnlyModelViewSet):
permission_classes = [IsAuthenticated]
serializer_class = ExecutionSerializer
pagination_class = CustomPagination
filter_backends = [OrderingFilter]
Expand All @@ -30,23 +30,23 @@ def get_queryset(self) -> Optional[QuerySet]:
queryset = WorkflowExecution.objects.all()

# Filter based on execution entity
if execution_entity == "API":
if execution_entity == ExecutionEntity.API:
queryset = queryset.filter(
pipeline_id__in=APIDeployment.objects.values_list("id", flat=True)
)
elif execution_entity == "ETL":
elif execution_entity == ExecutionEntity.ETL:
queryset = queryset.filter(
pipeline_id__in=Pipeline.objects.filter(
pipeline_type=Pipeline.PipelineType.ETL
).values_list("id", flat=True)
)
elif execution_entity == "TASK":
elif execution_entity == ExecutionEntity.TASK:
queryset = queryset.filter(
pipeline_id__in=Pipeline.objects.filter(
pipeline_type=Pipeline.PipelineType.TASK
).values_list("id", flat=True)
)
elif execution_entity == "WF":
elif execution_entity == ExecutionEntity.WORKFLOW:
queryset = queryset.filter(
pipeline_id=None,
workflow_id__in=Workflow.objects.values_list("id", flat=True),
Expand Down
25 changes: 25 additions & 0 deletions backend/workflow_manager/execution/views/file_centric.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
import logging

from rest_framework import viewsets
from rest_framework.filters import OrderingFilter
from rest_framework.permissions import IsAuthenticated
from utils.pagination import CustomPagination
from workflow_manager.execution.serializer import FileCentricExecutionSerializer
from workflow_manager.file_execution.models import (
WorkflowFileExecution as FileExecution,
)

logger = logging.getLogger(__name__)


class FileCentricExecutionViewSet(viewsets.ReadOnlyModelViewSet):
permission_classes = [IsAuthenticated]
serializer_class = FileCentricExecutionSerializer
pagination_class = CustomPagination
filter_backends = [OrderingFilter]
ordering_fields = ["created_at"]
ordering = ["created_at"]

def get_queryset(self):
execution_id = self.kwargs.get("pk")
return FileExecution.objects.filter(workflow_execution_id=execution_id)
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Generated by Django 4.2.1 on 2025-02-04 04:12

from django.db import migrations, models


class Migration(migrations.Migration):

dependencies = [
("workflow_v2", "0005_workflowexecution_tags"),
]

operations = [
migrations.AddIndex(
model_name="workflowexecution",
index=models.Index(
fields=["workflow_id", "-created_at"],
name="workflow_ex_workflo_5942c9_idx",
),
),
migrations.AddIndex(
model_name="workflowexecution",
index=models.Index(
fields=["pipeline_id", "-created_at"],
name="workflow_ex_pipelin_126dbf_idx",
),
),
]
7 changes: 5 additions & 2 deletions backend/workflow_manager/workflow_v2/models/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
from .execution import WorkflowExecution # noqa: F401
# isort:skip_file

# Do not change the order of the imports below to avoid circular dependency issues
from .workflow import Workflow # noqa: F401
from .execution_log import ExecutionLog # noqa: F401
from .execution import WorkflowExecution # noqa: F401
from .file_history import FileHistory # noqa: F401
from .workflow import Workflow # noqa: F401
45 changes: 45 additions & 0 deletions backend/workflow_manager/workflow_v2/models/execution.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,17 @@
import logging
import uuid
from typing import Optional

from api_v2.models import APIDeployment
from django.core.exceptions import ObjectDoesNotExist
from django.db import models
from pipeline_v2.models import Pipeline
from tags.models import Tag
from utils.models.base_model import BaseModel
from workflow_manager.workflow_v2.models import Workflow

logger = logging.getLogger(__name__)


EXECUTION_ERROR_LENGTH = 256

Expand Down Expand Up @@ -32,6 +41,7 @@ class Type(models.TextChoices):
null=True,
db_comment="task id of asynchronous execution",
)
# TODO: Make as foreign key to access the instance directly
workflow_id = models.UUIDField(
editable=False, db_comment="Id of workflow to be executed"
)
Expand Down Expand Up @@ -68,12 +78,47 @@ class Meta:
verbose_name = "Workflow Execution"
verbose_name_plural = "Workflow Executions"
db_table = "workflow_execution"
indexes = [
models.Index(fields=["workflow_id", "-created_at"]),
models.Index(fields=["pipeline_id", "-created_at"]),
]

@property
def tag_names(self) -> list[str]:
"""Return a list of tag names associated with the workflow execution."""
return list(self.tags.values_list("name", flat=True))

@property
def workflow_name(self) -> Optional[str]:
"""Obtains the workflow's name associated to this execution."""
try:
return Workflow.objects.get(id=self.workflow_id).workflow_name
except ObjectDoesNotExist:
logger.warning(
f"Expected workflow ID '{self.workflow_id}' to exist but missing"
)
return None

@property
def pipeline_name(self) -> Optional[str]:
"""Obtains the pipeline's name associated to this execution.
It could be ETL / TASK / API pipeline, None returned if there's no such pipeline
"""
if not self.pipeline_id:
return None

try:
return APIDeployment.objects.get(id=self.pipeline_id).display_name
except ObjectDoesNotExist:
pass

try:
return Pipeline.objects.get(id=self.pipeline_id).pipeline_name
except ObjectDoesNotExist:
pass

return None

def __str__(self) -> str:
return (
f"Workflow execution: {self.id} ("
Expand Down
5 changes: 5 additions & 0 deletions backend/workflow_manager/workflow_v2/workflow_helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,11 @@ def _process_file(
except Exception as e:
error = f"Error processing file '{os.path.basename(input_file)}'. {str(e)}"
execution_service.publish_log(error, level=LogLevel.ERROR)
workflow_file_execution.update_status(
status=ExecutionStatus.ERROR,
execution_error=error,
)
# Not propagating error here to continue execution for other files
execution_service.publish_update_log(
LogState.RUNNING,
f"Processing output for {file_name}",
Expand Down

0 comments on commit 71e8dd1

Please sign in to comment.