diff --git a/backend/workflow_manager/workflow_v2/execution.py b/backend/workflow_manager/workflow_v2/execution.py index b3afceb03..24cb13721 100644 --- a/backend/workflow_manager/workflow_v2/execution.py +++ b/backend/workflow_manager/workflow_v2/execution.py @@ -104,7 +104,7 @@ def __init__( f"workflow ID: {self.workflow_id}, execution ID: {self.execution_id}, " f"web socket messaging channel ID: {self.execution_log_id}" ) - + self.execution_attempt = 1 self.compilation_result = self.compile_workflow(execution_id=self.execution_id) @classmethod @@ -166,7 +166,7 @@ def update_execution( execution.error_message = error[:EXECUTION_ERROR_LENGTH] if increment_attempt: execution.attempts += 1 - + self.execution_attempt = execution.attempts execution.save() def has_successful_compilation(self) -> bool: @@ -240,6 +240,7 @@ def execute( file_execution_id=file_execution_id, file_name=file_name, execution_type=execution_type, + execution_attempt=self.execution_attempt, ) end_time = time.time() execution_time = end_time - start_time diff --git a/runner/src/unstract/runner/clients/docker.py b/runner/src/unstract/runner/clients/docker.py index 5676423b5..8cac6c153 100644 --- a/runner/src/unstract/runner/clients/docker.py +++ b/runner/src/unstract/runner/clients/docker.py @@ -165,6 +165,7 @@ def get_container_run_config( run_id: str, envs: Optional[dict[str, Any]] = None, auto_remove: bool = False, + execution_attempt: int = 1, ) -> dict[str, Any]: if envs is None: envs = {} @@ -192,7 +193,10 @@ def get_container_run_config( ) return { "name": UnstractUtils.build_tool_container_name( - tool_image=self.image_name, tool_version=self.image_tag, run_id=run_id + tool_image=self.image_name, + tool_version=self.image_tag, + run_id=run_id, + execution_attempt=execution_attempt, ), "image": self.get_image(), "command": command, diff --git a/runner/src/unstract/runner/clients/interface.py b/runner/src/unstract/runner/clients/interface.py index 231a573d8..0b22a39d5 100644 --- a/runner/src/unstract/runner/clients/interface.py +++ b/runner/src/unstract/runner/clients/interface.py @@ -68,6 +68,7 @@ def get_container_run_config( run_id: str, envs: Optional[dict[str, Any]] = None, auto_remove: bool = False, + execution_attempt: int = 1, ) -> dict[str, Any]: """Generate the configuration dictionary to run the container. diff --git a/runner/src/unstract/runner/main.py b/runner/src/unstract/runner/main.py index 3caa480d4..b4005fa76 100644 --- a/runner/src/unstract/runner/main.py +++ b/runner/src/unstract/runner/main.py @@ -32,6 +32,7 @@ def run_container() -> Optional[Any]: settings = data["settings"] envs = data["envs"] messaging_channel = data["messaging_channel"] + execution_attempt = data.get("execution_attempt", 1) runner = UnstractRunner(image_name, image_tag, app) result = runner.run_container( @@ -42,6 +43,7 @@ def run_container() -> Optional[Any]: settings=settings, envs=envs, messaging_channel=messaging_channel, + execution_attempt=execution_attempt, ) return result diff --git a/runner/src/unstract/runner/runner.py b/runner/src/unstract/runner/runner.py index eda4c6528..9fbe691de 100644 --- a/runner/src/unstract/runner/runner.py +++ b/runner/src/unstract/runner/runner.py @@ -146,6 +146,7 @@ def run_command(self, command: str) -> Optional[Any]: execution_id="", run_id="", auto_remove=True, + execution_attempt=1, ) container = None @@ -172,6 +173,7 @@ def run_container( run_id: str, settings: dict[str, Any], envs: dict[str, Any], + execution_attempt: int, messaging_channel: Optional[str] = None, ) -> Optional[Any]: """RUN container With RUN Command. @@ -181,6 +183,7 @@ def run_container( params (dict[str, Any]): params to run the tool settings (dict[str, Any]): Tool settings envs (dict[str, Any]): Tool env + execution_attempt (int): The current execution attempt number. messaging_channel (Optional[str], optional): socket io channel Returns: @@ -214,6 +217,7 @@ def run_container( execution_id=execution_id, run_id=run_id, envs=envs, + execution_attempt=execution_attempt, ) # Add labels to container for logging with Loki. # This only required for observability. diff --git a/unstract/core/src/unstract/core/utilities.py b/unstract/core/src/unstract/core/utilities.py index 89213e49f..367bae7ae 100644 --- a/unstract/core/src/unstract/core/utilities.py +++ b/unstract/core/src/unstract/core/utilities.py @@ -29,9 +29,10 @@ def get_env(env_key: str, default: Optional[str] = None, raise_err=False) -> str @staticmethod def build_tool_container_name( - tool_image: str, tool_version: str, run_id: str + tool_image: str, tool_version: str, run_id: str, execution_attempt: int ) -> str: - container_name = f"{tool_image.split('/')[-1]}-{tool_version}-{run_id}" + tool_name = tool_image.split("/")[-1] + container_name = f"{tool_name}-{tool_version}-{execution_attempt}-{run_id}" # To support limits of container clients like K8s if len(container_name) > 63: diff --git a/unstract/tool-sandbox/src/unstract/tool_sandbox/helper.py b/unstract/tool-sandbox/src/unstract/tool_sandbox/helper.py index d52a95443..d86329490 100644 --- a/unstract/tool-sandbox/src/unstract/tool_sandbox/helper.py +++ b/unstract/tool-sandbox/src/unstract/tool_sandbox/helper.py @@ -73,6 +73,7 @@ def call_tool_handler( image_name: str, image_tag: str, settings: dict[str, Any], + execution_attempt: int, ) -> Optional[dict[str, Any]]: """Calling unstract runner to run the required tool. @@ -91,6 +92,7 @@ def call_tool_handler( image_name, image_tag, settings, + execution_attempt, ) response = requests.post(url, json=data) @@ -114,6 +116,7 @@ def create_tool_request_data( image_name: str, image_tag: str, settings: dict[str, Any], + execution_attempt: int, ) -> dict[str, Any]: data = { "image_name": image_name, @@ -125,5 +128,6 @@ def create_tool_request_data( "settings": settings, "envs": self.envs, "messaging_channel": self.messaging_channel, + "execution_attempt": execution_attempt, } return data diff --git a/unstract/tool-sandbox/src/unstract/tool_sandbox/tool_sandbox.py b/unstract/tool-sandbox/src/unstract/tool_sandbox/tool_sandbox.py index c9a277843..9dee4b08a 100644 --- a/unstract/tool-sandbox/src/unstract/tool_sandbox/tool_sandbox.py +++ b/unstract/tool-sandbox/src/unstract/tool_sandbox/tool_sandbox.py @@ -93,10 +93,20 @@ def get_variables(self) -> Optional[dict[str, Any]]: ) return result - def run_tool(self, run_id: str) -> Optional[dict[str, Any]]: + def run_tool(self, run_id: str, execution_attempt: int) -> Optional[dict[str, Any]]: + """Run tool by handler + + Args: + run_id (str): Run_id for the execution + execution_attempt (int): Current execution attempt + + Returns: + Optional[dict[str, Any]]: _description_ + """ return self.helper.call_tool_handler( # type: ignore run_id, self.image_name, self.image_tag, self.settings, + execution_attempt, ) diff --git a/unstract/workflow-execution/src/unstract/workflow_execution/tools_utils.py b/unstract/workflow-execution/src/unstract/workflow_execution/tools_utils.py index 60ffccb85..3bf5dd49a 100644 --- a/unstract/workflow-execution/src/unstract/workflow_execution/tools_utils.py +++ b/unstract/workflow-execution/src/unstract/workflow_execution/tools_utils.py @@ -175,20 +175,24 @@ def check_tools_are_available(self, tool_ids: list[str]) -> bool: def run_tool( self, run_id: str, + execution_attempt: int, tool_sandbox: ToolSandbox, ) -> Any: - return self.run_tool_with_retry(run_id, tool_sandbox) + return self.run_tool_with_retry(run_id, tool_sandbox, execution_attempt) def run_tool_with_retry( self, run_id: str, tool_sandbox: ToolSandbox, + execution_attempt: int, max_retries: int = ToolExecution.MAXIMUM_RETRY, ) -> Any: error: Optional[dict[str, Any]] = None for retry_count in range(max_retries): try: - response = tool_sandbox.run_tool(run_id) + response = tool_sandbox.run_tool( + run_id=run_id, execution_attempt=execution_attempt + ) if response: return response logger.warning( diff --git a/unstract/workflow-execution/src/unstract/workflow_execution/workflow_execution.py b/unstract/workflow-execution/src/unstract/workflow_execution/workflow_execution.py index 39b94fd53..bb9ad6cf3 100644 --- a/unstract/workflow-execution/src/unstract/workflow_execution/workflow_execution.py +++ b/unstract/workflow-execution/src/unstract/workflow_execution/workflow_execution.py @@ -140,7 +140,11 @@ def build_workflow(self) -> None: logger.info(f"Execution {self.execution_id}: Build completed") def execute_workflow( - self, file_execution_id: str, file_name: str, execution_type: ExecutionType + self, + file_execution_id: str, + file_name: str, + execution_type: ExecutionType, + execution_attempt: int, ) -> None: """Executes the complete workflow by running each tools one by one. Returns the result from final tool in a dictionary. @@ -149,6 +153,7 @@ def execute_workflow( file_execution_id (str): UUID for a single run of a file file_name (str): Name of the file to process execution_type (ExecutionType): STEP or COMPLETE + execution_attempt: The current attempt number for this execution Raises: BadRequestException: In case someone tries to execute a workflow @@ -172,6 +177,7 @@ def execute_workflow( tool_image=sandbox.image_name, tool_version=sandbox.image_tag, run_id=file_execution_id, + execution_attempt=execution_attempt, ) logger.info( f"Running execution: '{self.execution_id}', " @@ -179,14 +185,14 @@ def execute_workflow( f"file '{file_name}', container: '{container_name}'" ) self._execute_step( - step=step, - sandbox=sandbox, + step=step, sandbox=sandbox, execution_attempt=execution_attempt ) self._finalize_execution(execution_type) def _execute_step( self, step: int, + execution_attempt: int, sandbox: ToolSandbox, ) -> None: """Execution of workflow step. @@ -195,6 +201,7 @@ def _execute_step( step (int): workflow step sandbox (ToolSandbox): instance of tool sandbox execution_type (ExecutionType): step or complete + execution_attempt: The current attempt number for this execution last_step_output (list[Any]): output of previous step Raises: @@ -224,7 +231,9 @@ def _execute_step( component=tool_instance_id, ) result = self.tool_utils.run_tool( - run_id=self.file_execution_id, tool_sandbox=sandbox + run_id=self.file_execution_id, + tool_sandbox=sandbox, + execution_attempt=execution_attempt, ) if result and result.get("error"): raise ToolOutputNotFoundException(result.get("error"))