Skip to content

Commit

Permalink
fix: Avoid pipeline status update for API deployments (#398)
Browse files Browse the repository at this point in the history
* Avoid pipeline status update for API deployments

* Lockfile hash updated

* Skipping warning for API deployment

* Compare exception with isinstance()

* Minor fix

* Update backend/workflow_manager/workflow/workflow_helper.py

Signed-off-by: ali <[email protected]>

---------

Signed-off-by: ali <[email protected]>
Co-authored-by: Neha <[email protected]>
Co-authored-by: ali <[email protected]>
  • Loading branch information
3 people authored Jun 11, 2024
1 parent 697703d commit eae2e6b
Show file tree
Hide file tree
Showing 4 changed files with 42 additions and 14 deletions.
2 changes: 1 addition & 1 deletion backend/pdm.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 10 additions & 0 deletions backend/workflow_manager/workflow/models/execution.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,3 +64,13 @@ class Type(models.TextChoices):
execution_time = models.FloatField(
default=0, db_comment="execution time in seconds"
)

def __str__(self) -> str:
return (
f"Workflow execution: {self.id} ("
f"pipeline ID: {self.pipeline_id}, "
f"workflow iD: {self.workflow_id}, "
f"execution method: {self.execution_method}, "
f"status: {self.status}, "
f"error message: {self.error_message})"
)
42 changes: 30 additions & 12 deletions backend/workflow_manager/workflow/workflow_helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -267,35 +267,53 @@ def run_workflow(
destination.validate()
# Execution Process
try:
updated_execution = WorkflowHelper.process_input_files(
workflow_execution = WorkflowHelper.process_input_files(
workflow,
source,
destination,
execution_service,
single_step=single_step,
hash_values_of_files=hash_values_of_files,
)
# TODO: Update through signals
WorkflowHelper._update_pipeline_status(
pipeline_id=pipeline_id, workflow_execution=workflow_execution
)
return ExecutionResponse(
str(workflow.id),
str(workflow_execution.id),
workflow_execution.status,
log_id=str(execution_service.execution_log_id),
error=workflow_execution.error_message,
mode=workflow_execution.execution_mode,
result=destination.api_results,
)
finally:
destination.delete_execution_directory()

@staticmethod
def _update_pipeline_status(
pipeline_id: Optional[str], workflow_execution: WorkflowExecution
) -> None:
try:
if pipeline_id:
# Update pipeline status
if updated_execution.status != ExecutionStatus.ERROR.value:
if workflow_execution.status != ExecutionStatus.ERROR.value:
PipelineProcessor.update_pipeline(
pipeline_id, Pipeline.PipelineStatus.SUCCESS
)
else:
PipelineProcessor.update_pipeline(
pipeline_id, Pipeline.PipelineStatus.FAILURE
)
return ExecutionResponse(
str(workflow.id),
str(updated_execution.id),
updated_execution.status,
log_id=str(execution_service.execution_log_id),
error=updated_execution.error_message,
mode=updated_execution.execution_mode,
result=destination.api_results,
# Expected exception since API deployments are not tracked in Pipeline
except Pipeline.DoesNotExist:
pass
except Exception as e:
logger.warning(
f"Error updating pipeline {pipeline_id} status: {e}, "
f"with workflow execution: {workflow_execution}"
)
finally:
destination.delete_execution_directory()

@staticmethod
def get_status_of_async_task(
Expand Down
2 changes: 1 addition & 1 deletion prompt-service/src/unstract/prompt_service/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -837,7 +837,7 @@ def log_exceptions(e: HTTPException):
"""
code = 500
if hasattr(e, "code"):
code = e.code
code = e.code or code

if code >= 500:
message = "{method} {url} {status}\n\n{error}\n\n````{tb}````".format(
Expand Down

0 comments on commit eae2e6b

Please sign in to comment.