Skip to content

Commit

Permalink
fix: Pipeline manual sync fix, avoids executing twice (#355)
Browse files Browse the repository at this point in the history
* Pipeline manual sync fix, avoids executing twice

* Removed unused function

* Reverted spacewrapper change in FE, pipeline status sync error handling changes

* Removed unused import
  • Loading branch information
chandrasekharan-zipstack authored May 28, 2024
1 parent 42939b9 commit 68c835a
Show file tree
Hide file tree
Showing 4 changed files with 61 additions and 34 deletions.
2 changes: 1 addition & 1 deletion backend/workflow_manager/workflow/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ class WorkflowRegenerationError(APIException):


class WorkflowExecutionError(APIException):
status_code = 400
status_code = 500
default_detail = "Error executing workflow."


Expand Down
30 changes: 7 additions & 23 deletions backend/workflow_manager/workflow/execution.py
Original file line number Diff line number Diff line change
Expand Up @@ -165,14 +165,6 @@ def update_execution(

execution.save()

def __execution_error(
self,
error_message: str,
detailed_message: str,
execution_time: Optional[float] = None,
) -> WorkflowExecutionError:
return WorkflowExecutionError(detail=error_message)

def has_successful_compilation(self) -> bool:
return self.compilation_result["success"] is True

Expand All @@ -195,10 +187,7 @@ def build(self) -> None:
status=ExecutionStatus.ERROR,
error=self.compilation_result["problems"][0],
)
raise self.__execution_error(
error_message=self.compilation_result["problems"][0],
detailed_message=self.compilation_result["problems"][0],
)
raise WorkflowExecutionError(self.compilation_result["problems"][0])

def execute(self, single_step: bool = False) -> None:
execution_type = ExecutionType.COMPLETE
Expand All @@ -224,23 +213,18 @@ def execute(self, single_step: bool = False) -> None:
end_time = time.time()
execution_time = end_time - start_time
message = str(exception)[:EXECUTION_ERROR_LENGTH]
logger.info(f"Execution {self.execution_id} Error {exception}")
raise self.__execution_error(
error_message=message,
detailed_message=message,
execution_time=execution_time,
logger.info(
f"Execution {self.execution_id} in {execution_time}s, "
f" Error {exception}"
)
raise WorkflowExecutionError(message) from exception
else:
error_message = f"Unknown Execution Method {self.execution_mode}"
raise self.__execution_error(
error_message=error_message, detailed_message=error_message
)
raise WorkflowExecutionError(error_message)

else:
error_message = f"Errors while compiling workflow {self.compilation_result['problems'][0]}" # noqa
raise self.__execution_error(
error_message=error_message, detailed_message=error_message
)
raise WorkflowExecutionError(error_message)

def publish_initial_workflow_logs(self, total_files: int) -> None:
"""Publishes the initial logs for the workflow.
Expand Down
1 change: 0 additions & 1 deletion backend/workflow_manager/workflow/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,6 @@ def execute_workflow(
pipeline_id=pipeline_guid,
hash_values_of_files=hash_values_of_files,
)
update_pipeline(pipeline_guid, Pipeline.PipelineStatus.SUCCESS)
else:
execution_response = WorkflowHelper.complete_execution(
workflow=workflow,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,9 @@ import {
SyncOutlined,
HighlightOutlined,
FileSearchOutlined,
ReloadOutlined,
} from "@ant-design/icons";
import { Dropdown, Image, Space, Switch, Typography } from "antd";
import { Button, Dropdown, Image, Space, Switch, Typography } from "antd";
import PropTypes from "prop-types";
import { useEffect, useState } from "react";
import cronstrue from "cronstrue";
Expand Down Expand Up @@ -78,11 +79,6 @@ function Pipelines({ type }) {
handleLoaderInTableData(fieldsToUpdate, pipelineId);

handleSyncApiReq(body)
.then((res) => {
const executionId = res?.data?.execution?.execution_id;
body["execution_id"] = executionId;
return handleSyncApiReq(body);
})
.then((res) => {
const data = res?.data?.pipeline;
fieldsToUpdate["last_run_status"] = data?.last_run_status;
Expand All @@ -103,6 +99,31 @@ function Pipelines({ type }) {
});
};

const handleStatusRefresh = (pipelineId) => {
const fieldsToUpdate = {
last_run_status: "processing",
};
handleLoaderInTableData(fieldsToUpdate, pipelineId);

getPipelineData(pipelineId)
.then((res) => {
const data = res?.data;
fieldsToUpdate["last_run_status"] = data?.last_run_status;
fieldsToUpdate["last_run_time"] = data?.last_run_time;
})
.catch((err) => {
setAlertDetails(
handleException(err, `Failed to update pipeline status.`)
);
const date = new Date();
fieldsToUpdate["last_run_status"] = "FAILURE";
fieldsToUpdate["last_run_time"] = date.toISOString();
})
.finally(() => {
handleLoaderInTableData(fieldsToUpdate, pipelineId);
});
};

const handleLoaderInTableData = (updatedFields, pipelineId) => {
const filteredData = tableData.map((item) => {
if (item.id === pipelineId) {
Expand Down Expand Up @@ -180,6 +201,21 @@ function Pipelines({ type }) {
});
};

const getPipelineData = (pipelineId) => {
const requestOptions = {
method: "GET",
url: `/api/v1/unstract/${sessionDetails?.orgId}/pipeline/${pipelineId}/`,
headers: {
"X-CSRFToken": sessionDetails?.csrfToken,
},
};
return axiosPrivate(requestOptions)
.then((res) => res)
.catch((err) => {
throw err;
});
};

const fetchExecutionLogs = (page = 1, pageSize = 10) => {
const requestOptions = {
method: "GET",
Expand Down Expand Up @@ -400,9 +436,17 @@ function Pipelines({ type }) {
{record.last_run_status === "processing" ? (
<SpinnerLoader />
) : (
<Typography.Text className="p-or-d-typography" strong>
{record?.last_run_status}
</Typography.Text>
<Space>
<Typography.Text className="p-or-d-typography" strong>
{record?.last_run_status}
</Typography.Text>
<Button
icon={<ReloadOutlined />}
type="text"
size="small"
onClick={() => handleStatusRefresh(record?.id)}
/>
</Space>
)}
</>
),
Expand Down

0 comments on commit 68c835a

Please sign in to comment.