Skip to content

Commit

Permalink
Pipline flow changed from asyncrounous execution function (#659)
Browse files Browse the repository at this point in the history
* Pipline flow changed from asyncrounous execution function

* handle sysncnow errors

* Update backend/scheduler/helper.py

Co-authored-by: Chandrasekharan M <[email protected]>
Signed-off-by: ali <[email protected]>

* pre commit fix

---------

Signed-off-by: ali <[email protected]>
Co-authored-by: Chandrasekharan M <[email protected]>
  • Loading branch information
1 parent 13f1dbf commit c2f654b
Show file tree
Hide file tree
Showing 5 changed files with 78 additions and 28 deletions.
10 changes: 4 additions & 6 deletions backend/scheduler/helper.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import logging
import uuid
from typing import Any

from django.db import connection
Expand All @@ -20,11 +19,11 @@
if check_feature_flag_status(FeatureFlag.MULTI_TENANCY_V2):
from pipeline_v2.models import Pipeline
from utils.user_context import UserContext
from workflow_manager.workflow_v2.constants import WorkflowExecutionKey, WorkflowKey
from workflow_manager.workflow_v2.constants import WorkflowKey
from workflow_manager.workflow_v2.serializers import ExecuteWorkflowSerializer
else:
from pipeline.models import Pipeline
from workflow_manager.workflow.constants import WorkflowExecutionKey, WorkflowKey
from workflow_manager.workflow.constants import WorkflowKey
from workflow_manager.workflow.serializers import ExecuteWorkflowSerializer
logger = logging.getLogger(__name__)

Expand All @@ -46,8 +45,6 @@ def _schedule_task_job(pipeline: Pipeline, job_data: Any) -> None:
task_data = job_kwargs.get("data", {})

task_data[WorkflowKey.WF_ID] = pipeline.workflow.id
execution_id = str(uuid.uuid4())
task_data[WorkflowExecutionKey.EXECUTION_ID] = execution_id
serializer = ExecuteWorkflowSerializer(data=task_data)
serializer.is_valid(raise_exception=True)
workflow_id = serializer.get_workflow_id(serializer.validated_data)
Expand All @@ -66,7 +63,8 @@ def _schedule_task_job(pipeline: Pipeline, job_data: Any) -> None:
str(workflow_id),
organization_id,
execution_action or "",
execution_id,
# TODO: execution_id parameter cannot be removed without a migration.
"",
str(pipeline.pk),
# Added to remain backward compatible - remove after data migration
# which removes unused args in execute_pipeline_task
Expand Down
7 changes: 2 additions & 5 deletions backend/scheduler/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,6 @@ def execute_pipeline_task(
execute_pipeline_task_v2(
workflow_id=workflow_id,
organization_id=org_schema,
execution_id=execution_id,
pipeline_id=pipepline_id,
pipeline_name=name,
)
Expand Down Expand Up @@ -120,7 +119,7 @@ def execute_pipeline_task(
pipepline_id, Pipeline.PipelineStatus.INPROGRESS
)
execution_response = WorkflowHelper.complete_execution(
workflow, execution_id, pipepline_id
workflow=workflow, pipeline_id=pipepline_id
)
execution_response.remove_result_metadata_keys()
logger.info(f"Execution response: {execution_response}")
Expand All @@ -132,7 +131,6 @@ def execute_pipeline_task(
def execute_pipeline_task_v2(
workflow_id: Any,
organization_id: Any,
execution_id: Any,
pipeline_id: Any,
pipeline_name: Any,
) -> None:
Expand All @@ -141,7 +139,6 @@ def execute_pipeline_task_v2(
Args:
workflow_id (Any): UID of workflow entity
org_schema (Any): Organization Identifier
execution_id (Any): UID of execution entity
pipeline_id (Any): UID of pipeline entity
name (Any): pipeline name
"""
Expand Down Expand Up @@ -170,7 +167,7 @@ def execute_pipeline_task_v2(
pipeline_id, Pipeline.PipelineStatus.INPROGRESS
)
execution_response = WorkflowHelper.complete_execution(
workflow, execution_id, pipeline_id
workflow=workflow, pipeline_id=pipeline_id
)
execution_response.remove_result_metadata_keys()
logger.info(
Expand Down
28 changes: 27 additions & 1 deletion backend/workflow_manager/workflow/execution.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,8 +103,9 @@ def __init__(

self.compilation_result = self.compile_workflow(execution_id=self.execution_id)

@staticmethod
@classmethod
def create_workflow_execution(
cls,
workflow_id: str,
pipeline_id: Optional[str] = None,
single_step: bool = False,
Expand All @@ -113,6 +114,11 @@ def create_workflow_execution(
execution_id: Optional[str] = None,
mode: tuple[str, str] = WorkflowExecution.Mode.INSTANT,
) -> WorkflowExecution:
# Validating with existing execution
existing_execution = cls.get_execution_instance_by_id(execution_id)
if existing_execution:
return existing_execution

execution_method: tuple[str, str] = (
WorkflowExecution.Method.SCHEDULED
if scheduled
Expand Down Expand Up @@ -168,6 +174,26 @@ def get_execution_instance(self) -> WorkflowExecution:
)
return execution

@classmethod
def get_execution_instance_by_id(
cls, execution_id: str
) -> Optional[WorkflowExecution]:
"""Get execution by execution ID.
Args:
execution_id (str): UID of execution entity
Returns:
Optional[WorkflowExecution]: WorkflowExecution Entity
"""
try:
execution: WorkflowExecution = WorkflowExecution.objects.get(
pk=execution_id
)
return execution
except WorkflowExecution.DoesNotExist:
return None

def build(self) -> None:
if self.compilation_result["success"] is True:
self.build_workflow()
Expand Down
3 changes: 3 additions & 0 deletions backend/workflow_manager/workflow/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
WorkflowRegenerationError,
)
from workflow_manager.workflow.generator import WorkflowGenerator
from workflow_manager.workflow.models.execution import WorkflowExecution
from workflow_manager.workflow.models.workflow import Workflow
from workflow_manager.workflow.serializers import (
ExecuteWorkflowResponseSerializer,
Expand Down Expand Up @@ -233,12 +234,14 @@ def execute_workflow(
workflow=workflow,
execution_id=execution_id,
pipeline_id=pipeline_guid,
execution_mode=WorkflowExecution.Mode.INSTANT,
hash_values_of_files=hash_values_of_files,
)
else:
execution_response = WorkflowHelper.complete_execution(
workflow=workflow,
execution_id=execution_id,
execution_mode=WorkflowExecution.Mode.INSTANT,
hash_values_of_files=hash_values_of_files,
)
return execution_response
Expand Down
58 changes: 42 additions & 16 deletions backend/workflow_manager/workflow/workflow_helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -545,28 +545,54 @@ def complete_execution(
workflow: Workflow,
execution_id: Optional[str] = None,
pipeline_id: Optional[str] = None,
execution_mode: Optional[WorkflowExecution] = WorkflowExecution.Mode.QUEUE,
hash_values_of_files: dict[str, FileHash] = {},
) -> ExecutionResponse:
if pipeline_id:
logger.info(f"Executing pipeline: {pipeline_id}")
if not execution_id:
workflow_execution = (
WorkflowExecutionServiceHelper.create_workflow_execution(
workflow_id=workflow.id,
single_step=False,
pipeline_id=pipeline_id,
mode=WorkflowExecution.Mode.QUEUE,
execution_id=execution_id,
)
# Create a new WorkflowExecution entity for each pipeline execution.
# This ensures every pipeline run is tracked as a distinct execution.
workflow_execution = (
WorkflowExecutionServiceHelper.create_workflow_execution(
workflow_id=workflow.id,
single_step=False,
pipeline_id=pipeline_id,
mode=execution_mode,
)
execution_id = workflow_execution.id
response: ExecutionResponse = WorkflowHelper.execute_workflow_async(
workflow_id=workflow.id,
pipeline_id=pipeline_id,
execution_id=execution_id,
hash_values_of_files=hash_values_of_files,
)
return response
execution_id = workflow_execution.id
log_events_id = StateStore.get(Common.LOG_EVENTS_ID)
org_schema = connection.tenant.schema_name
if execution_mode == WorkflowExecution.Mode.INSTANT:
# Instant request from UX (Sync now in ETL and Workflow page)
response: ExecutionResponse = WorkflowHelper.execute_workflow_async(
workflow_id=workflow.id,
pipeline_id=pipeline_id,
execution_id=execution_id,
hash_values_of_files=hash_values_of_files,
)
return response
else:
execution_result = WorkflowHelper.execute_bin(
schema_name=org_schema,
workflow_id=workflow.id,
execution_id=workflow_execution.id,
hash_values_of_files=hash_values_of_files,
scheduled=True,
execution_mode=execution_mode,
pipeline_id=pipeline_id,
log_events_id=log_events_id,
)

updated_execution = WorkflowExecution.objects.get(id=execution_id)
execution_response = ExecutionResponse(
workflow.id,
execution_id,
updated_execution.status,
result=execution_result,
)
return execution_response

if execution_id is None:
# Creating execution entity and return
return WorkflowHelper.create_and_make_execution_response(
Expand Down

0 comments on commit c2f654b

Please sign in to comment.