diff --git a/backend/workflow_manager/workflow/urls.py b/backend/workflow_manager/workflow/urls.py index 7aa4c9925..648fe6513 100644 --- a/backend/workflow_manager/workflow/urls.py +++ b/backend/workflow_manager/workflow/urls.py @@ -27,6 +27,7 @@ {"get": "clear_file_marker"} ) workflow_schema = WorkflowViewSet.as_view({"get": "get_schema"}) +can_update = WorkflowViewSet.as_view({"get": "can_update"}) urlpatterns = format_suffix_patterns( [ path("", workflow_list, name="workflow-list"), @@ -41,6 +42,11 @@ workflow_clear_file_marker, name="clear-file-marker", ), + path( + "/can-update/", + can_update, + name="can-update", + ), path("execute/", workflow_execute, name="execute-workflow"), path( "active//", diff --git a/backend/workflow_manager/workflow/views.py b/backend/workflow_manager/workflow/views.py index 826c022c6..40ae2eb88 100644 --- a/backend/workflow_manager/workflow/views.py +++ b/backend/workflow_manager/workflow/views.py @@ -23,11 +23,11 @@ from workflow_manager.workflow.enums import SchemaEntity, SchemaType from workflow_manager.workflow.exceptions import ( InvalidRequest, + MissingEnvException, WorkflowDoesNotExistError, WorkflowExecutionError, WorkflowGenerationError, WorkflowRegenerationError, - MissingEnvException, ) from workflow_manager.workflow.generator import WorkflowGenerator from workflow_manager.workflow.models.workflow import Workflow @@ -248,9 +248,10 @@ def execute( logger.error(f"Error while executing workflow: {exception}") return Response( { - "error": "Please check the logs for more details: " + str(exception) + "error": "Please check the logs for more details: " + + str(exception) }, - status=status.HTTP_400_BAD_REQUEST + status=status.HTTP_400_BAD_REQUEST, ) except Exception as exception: logger.error(f"Error while executing workflow: {exception}") @@ -313,6 +314,11 @@ def clear_cache( ) return Response(response.get("message"), status=response.get("status")) + @action(detail=True, methods=["get"]) + def can_update(self, request: Request, pk: str) -> Response: + response: dict[str, Any] = WorkflowHelper.can_update_workflow(pk) + return Response(response, status=status.HTTP_200_OK) + @action(detail=True, methods=["get"]) def clear_file_marker( self, request: Request, *args: Any, **kwargs: Any diff --git a/backend/workflow_manager/workflow/workflow_helper.py b/backend/workflow_manager/workflow/workflow_helper.py index 1e0a720f2..cef6de655 100644 --- a/backend/workflow_manager/workflow/workflow_helper.py +++ b/backend/workflow_manager/workflow/workflow_helper.py @@ -7,6 +7,7 @@ from account.cache_service import CacheService from account.models import Organization +from api.models import APIDeployment from celery import current_task from celery import exceptions as celery_exceptions from celery import shared_task @@ -14,6 +15,7 @@ from django.conf import settings from django.db import IntegrityError, connection from django_tenants.utils import get_tenant_model, tenant_context +from pipeline.models import Pipeline from redis import StrictRedis from rest_framework import serializers from tool_instance.constants import ToolInstanceKey @@ -21,7 +23,6 @@ from tool_instance.tool_instance_helper import ToolInstanceHelper from unstract.workflow_execution.enums import LogComponent, LogState from unstract.workflow_execution.exceptions import StopExecution -from utils.request import feature_flag from workflow_manager.endpoint.destination import DestinationConnector from workflow_manager.endpoint.source import SourceConnector from workflow_manager.workflow.constants import ( @@ -202,10 +203,10 @@ def run_workflow( workflow_execution: Optional[WorkflowExecution] = None, execution_mode: Optional[tuple[str, str]] = None, ) -> ExecutionResponse: - tool_instances: list[ - ToolInstance - ] = ToolInstanceHelper.get_tool_instances_by_workflow( - workflow.id, ToolInstanceKey.STEP + tool_instances: list[ToolInstance] = ( + ToolInstanceHelper.get_tool_instances_by_workflow( + workflow.id, ToolInstanceKey.STEP + ) ) execution_mode = execution_mode or WorkflowExecution.Mode.INSTANT execution_service = WorkflowHelper.build_workflow_execution_service( @@ -652,6 +653,22 @@ def make_async_result(obj: AsyncResult) -> dict[str, Any]: "info": obj.info, } + @staticmethod + def can_update_workflow(workflow_id: str) -> dict[str, Any]: + try: + workflow: Workflow = Workflow.objects.get(pk=workflow_id) + if not workflow or workflow is None: + raise WorkflowDoesNotExistError() + used_count = Pipeline.objects.filter(workflow=workflow).count() + if used_count == 0: + used_count = APIDeployment.objects.filter( + workflow=workflow + ).count() + return {"can_update": used_count == 0} + except Workflow.DoesNotExist: + logger.error(f"Error getting workflow: {id}") + raise WorkflowDoesNotExistError() + class WorkflowSchemaHelper: """Helper class for workflow schema related methods.""" diff --git a/frontend/src/components/agency/actions/Actions.jsx b/frontend/src/components/agency/actions/Actions.jsx index 1f8f22920..216f18db4 100644 --- a/frontend/src/components/agency/actions/Actions.jsx +++ b/frontend/src/components/agency/actions/Actions.jsx @@ -54,13 +54,27 @@ function Actions({ statusBarMsg, initializeWfComp, stepLoader }) { const axiosPrivate = useAxiosPrivate(); useEffect(() => { - setApiOpsPresent(source?.connection_type === "API"); - }, [source]); - - useEffect(() => { - setCanAddTaskPipeline(destination?.connection_type === "FILESYSTEM"); - setCanAddETAPipeline(destination?.connection_type === "DATABASE"); - }, [destination]); + // Enable Deploy as API only when + // Source & Destination connection_type are selected as API + setApiOpsPresent( + source?.connection_type === "API" && + destination?.connection_type === "API" + ); + // Enable Deploy as Task Pipeline only when + // destination connection_type is FILESYSTEM and Source & Destination are Configured + setCanAddTaskPipeline( + destination?.connection_type === "FILESYSTEM" && + source?.connector_instance && + destination?.connector_instance + ); + // Enable Deploy as ETL Pipeline only when + // destination connection_type is DATABASE and Source & Destination are Configured + setCanAddETAPipeline( + destination?.connection_type === "DATABASE" && + source?.connector_instance && + destination.connector_instance + ); + }, [source, destination]); useEffect(() => { if (stepExecType === wfExecutionTypes[1]) { diff --git a/frontend/src/components/agency/ds-settings-card/DsSettingsCard.jsx b/frontend/src/components/agency/ds-settings-card/DsSettingsCard.jsx index 7f5f55bec..fdce7fac5 100644 --- a/frontend/src/components/agency/ds-settings-card/DsSettingsCard.jsx +++ b/frontend/src/components/agency/ds-settings-card/DsSettingsCard.jsx @@ -40,10 +40,16 @@ const inputOptions = [ value: "FILESYSTEM", label: "File System", }, + { + value: "DATABASE", + label: "Database", + }, ]; -function DsSettingsCard({ type, endpointDetails, message, dependent }) { - const [options, setOptions] = useState([...inputOptions]); +function DsSettingsCard({ type, endpointDetails, message }) { + const workflowStore = useWorkflowStore(); + const { source, destination, allowChangeEndpoint } = workflowStore; + const [options, setOptions] = useState({}); const [openModal, setOpenModal] = useState(false); const [listOfConnectors, setListOfConnectors] = useState([]); @@ -66,6 +72,30 @@ function DsSettingsCard({ type, endpointDetails, message, dependent }) { output: , }; + useEffect(() => { + if (type === "output") { + if (source?.connection_type === "") { + // Clear options when source is blank + setOptions({}); + } else { + // Filter options based on source connection type + const filteredOptions = ["API"].includes(source?.connection_type) + ? inputOptions.filter((option) => option.value === "API") + : inputOptions.filter((option) => option.value !== "API"); + + setOptions(filteredOptions); + } + } + + if (type === "input") { + // Remove Database from Source Dropdown + const filteredOptions = inputOptions.filter( + (option) => option.value !== "DATABASE" + ); + setOptions(filteredOptions); + } + }, [source, destination]); + useEffect(() => { if (endpointDetails?.connection_type !== connType) { setConnType(endpointDetails?.connection_type); @@ -83,21 +113,6 @@ function DsSettingsCard({ type, endpointDetails, message, dependent }) { getSourceDetails(); }, [endpointDetails]); - useEffect(() => { - if (type === "output") { - setOptions(() => { - const newOptions = [...inputOptions]; - newOptions.push({ - value: "DATABASE", - label: "Database", - }); - return newOptions; - }); - return; - } - setOptions([...inputOptions]); - }, [type]); - useEffect(() => { const menuItems = []; [...listOfConnectors].forEach((item) => { @@ -174,6 +189,41 @@ function DsSettingsCard({ type, endpointDetails, message, dependent }) { }; }; + const clearDestination = (updatedData) => { + const body = { ...destination, ...updatedData }; + + const requestOptions = { + method: "PUT", + url: `/api/v1/unstract/${sessionDetails?.orgId}/workflow/endpoint/${destination?.id}/`, + headers: { + "X-CSRFToken": sessionDetails?.csrfToken, + "Content-Type": "application/json", + }, + data: body, + }; + + axiosPrivate(requestOptions) + .then((res) => { + const data = res?.data || {}; + const updatedData = {}; + updatedData["destination"] = data; + updateWorkflow(updatedData); + }) + .catch((err) => { + setAlertDetails(handleException(err, "Failed to update")); + }); + }; + + const updateDestination = () => { + // Clear destination dropdown & data when input is switched + if (type === "input") { + clearDestination({ + connection_type: "", + connector_instance: null, + }); + } + }; + const handleUpdate = (updatedData, showSuccess) => { const body = { ...endpointDetails, ...updatedData }; @@ -237,19 +287,29 @@ function DsSettingsCard({ type, endpointDetails, message, dependent }) { - { + handleUpdate({ + connection_type: value, + connector_instance: null, + }); + updateDestination(); + }} + /> + + { getWfEndpointDetails(); + canUpdateWorkflow(); }, []); + const canUpdateWorkflow = () => { + const requestOptions = { + method: "GET", + url: `/api/v1/unstract/${sessionDetails?.orgId}/workflow/${projectId}/can-update/`, + }; + axiosPrivate(requestOptions) + .then((res) => { + const data = res?.data || {}; + const body = { + allowChangeEndpoint: data?.can_update, + }; + updateWorkflow(body); + }) + .catch((err) => { + setAlertDetails( + handleException(err, "Failed to get can update status") + ); + }); + }; + const moveItem = (fromIndex, toIndex, funcName, dragging) => { if (fromIndex === undefined && funcName) { handleAddToolInstance(funcName) @@ -158,7 +179,6 @@ function Steps({ steps, setSteps, activeToolId, sourceMsg, destinationMsg }) { type={sourceTypes.connectors[0]} endpointDetails={source} message={sourceMsg} - dependent={destination} /> @@ -185,7 +205,6 @@ function Steps({ steps, setSteps, activeToolId, sourceMsg, destinationMsg }) { type={sourceTypes.connectors[1]} endpointDetails={destination} message={destinationMsg} - dependent={source} /> diff --git a/frontend/src/components/deployments/create-api-deployment-modal/CreateApiDeploymentModal.jsx b/frontend/src/components/deployments/create-api-deployment-modal/CreateApiDeploymentModal.jsx index 0dbec8f8f..9214923c6 100644 --- a/frontend/src/components/deployments/create-api-deployment-modal/CreateApiDeploymentModal.jsx +++ b/frontend/src/components/deployments/create-api-deployment-modal/CreateApiDeploymentModal.jsx @@ -8,6 +8,7 @@ import { } from "../../../helpers/GetStaticData.js"; import { useAlertStore } from "../../../store/alert-store"; import { apiDeploymentsService } from "../../deployments/api-deployment/api-deployments-service.js"; +import { useWorkflowStore } from "../../../store/workflow-store.js"; const defaultFromDetails = { display_name: "", @@ -27,6 +28,8 @@ const CreateApiDeploymentModal = ({ workflowId, workflowEndpointList, }) => { + const workflowStore = useWorkflowStore(); + const { updateWorkflow } = workflowStore; const apiDeploymentsApiService = apiDeploymentsService(); const { setAlertDetails } = useAlertStore(); @@ -97,7 +100,10 @@ const CreateApiDeploymentModal = ({ apiDeploymentsApiService .createApiDeployment(body) .then((res) => { - if (!workflowId) { + if (workflowId) { + // Update - can update workflow endpoint status in store + updateWorkflow({ allowChangeEndpoint: false }); + } else { updateTableData(); setSelectedRow(res?.data); openCodeModal(true); diff --git a/frontend/src/components/pipelines-or-deployments/etl-task-deploy/EtlTaskDeploy.jsx b/frontend/src/components/pipelines-or-deployments/etl-task-deploy/EtlTaskDeploy.jsx index 49bf2c470..54c20df3b 100644 --- a/frontend/src/components/pipelines-or-deployments/etl-task-deploy/EtlTaskDeploy.jsx +++ b/frontend/src/components/pipelines-or-deployments/etl-task-deploy/EtlTaskDeploy.jsx @@ -14,6 +14,7 @@ import SpaceWrapper from "../../widgets/space-wrapper/SpaceWrapper.jsx"; import { SpinnerLoader } from "../../widgets/spinner-loader/SpinnerLoader.jsx"; import { workflowService } from "../../workflows/workflow/workflow-service.js"; import "./EtlTaskDeploy.css"; +import { useWorkflowStore } from "../../../store/workflow-store.js"; const days = [ "Monday", @@ -39,6 +40,8 @@ const EtlTaskDeploy = ({ setTableData, workflowId, }) => { + const workflowStore = useWorkflowStore(); + const { updateWorkflow } = workflowStore; const { sessionDetails } = useSessionStore(); const { setAlertDetails } = useAlertStore(); const axiosPrivate = useAxiosPrivate(); @@ -263,7 +266,10 @@ const EtlTaskDeploy = ({ setLoading(true); axiosPrivate(requestOptions) .then((res) => { - if (!workflowId) { + if (workflowId) { + // Update - can update workflow endpoint status in store + updateWorkflow({ allowChangeEndpoint: false }); + } else { addPipeline(res?.data); } setOpen(false); diff --git a/frontend/src/components/workflows/workflow/Workflows.jsx b/frontend/src/components/workflows/workflow/Workflows.jsx index 14330647b..f03b9b8b0 100644 --- a/frontend/src/components/workflows/workflow/Workflows.jsx +++ b/frontend/src/components/workflows/workflow/Workflows.jsx @@ -203,17 +203,41 @@ function Workflows() { }); } - function deleteProject(evt, project) { + const canDeleteProject = async (id) => { + let status = false; + await projectApiService.canUpdate(id).then((res) => { + status = res?.data?.can_update || false; + }); + return status; + }; + + const deleteProject = async (evt, project) => { evt.stopPropagation(); - projectApiService - .deleteProject(project.id) - .then(() => { - getProjectList(); - }) - .catch(() => { - console.error(`Unable to delete workflow ${project.id}`); + const canDelete = await canDeleteProject(project.id); + if (canDelete) { + projectApiService + .deleteProject(project.id) + .then(() => { + getProjectList(); + setAlertDetails({ + type: "success", + content: "Workflow deleted successfully", + }); + }) + .catch(() => { + setAlertDetails({ + type: "error", + content: `Unable to delete workflow ${project.id}`, + }); + }); + } else { + setAlertDetails({ + type: "error", + content: + "Cannot delete this Workflow, since it is used in one or many of the API/ETL/Task pipelines", }); - } + } + }; function closeNewProject() { setEditProject(); diff --git a/frontend/src/components/workflows/workflow/workflow-service.js b/frontend/src/components/workflows/workflow/workflow-service.js index 311fdeeda..494efe4b2 100644 --- a/frontend/src/components/workflows/workflow/workflow-service.js +++ b/frontend/src/components/workflows/workflow/workflow-service.js @@ -78,6 +78,13 @@ function workflowService() { }; return axiosPrivate(options); }, + canUpdate: (id) => { + options = { + url: `${path}/workflow/${id}/can-update`, + method: "GET", + }; + return axiosPrivate(options); + }, }; } diff --git a/frontend/src/store/workflow-store.js b/frontend/src/store/workflow-store.js index 516d090bd..c9dcf2a7d 100644 --- a/frontend/src/store/workflow-store.js +++ b/frontend/src/store/workflow-store.js @@ -10,6 +10,7 @@ const defaultState = { source: {}, destination: {}, details: {}, + allowChangeEndpoint: true, }; const STORE_VARIABLES = { ...defaultState };