Skip to content

Commit

Permalink
fix for duplicate container while retrying executions
Browse files Browse the repository at this point in the history
  • Loading branch information
muhammad-ali-e committed Jan 27, 2025
1 parent 4254402 commit 02226d7
Show file tree
Hide file tree
Showing 10 changed files with 52 additions and 12 deletions.
5 changes: 3 additions & 2 deletions backend/workflow_manager/workflow_v2/execution.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ def __init__(
f"workflow ID: {self.workflow_id}, execution ID: {self.execution_id}, "
f"web socket messaging channel ID: {self.execution_log_id}"
)

self.execution_attempt = 1
self.compilation_result = self.compile_workflow(execution_id=self.execution_id)

@classmethod
Expand Down Expand Up @@ -166,7 +166,7 @@ def update_execution(
execution.error_message = error[:EXECUTION_ERROR_LENGTH]
if increment_attempt:
execution.attempts += 1

self.execution_attempt = execution.attempts
execution.save()

def has_successful_compilation(self) -> bool:
Expand Down Expand Up @@ -240,6 +240,7 @@ def execute(
file_execution_id=file_execution_id,
file_name=file_name,
execution_type=execution_type,
execution_attempt=self.execution_attempt,
)
end_time = time.time()
execution_time = end_time - start_time
Expand Down
6 changes: 5 additions & 1 deletion runner/src/unstract/runner/clients/docker.py
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,7 @@ def get_container_run_config(
run_id: str,
envs: Optional[dict[str, Any]] = None,
auto_remove: bool = False,
execution_attempt: int = 1,
) -> dict[str, Any]:
if envs is None:
envs = {}
Expand Down Expand Up @@ -192,7 +193,10 @@ def get_container_run_config(
)
return {
"name": UnstractUtils.build_tool_container_name(
tool_image=self.image_name, tool_version=self.image_tag, run_id=run_id
tool_image=self.image_name,
tool_version=self.image_tag,
run_id=run_id,
execution_attempt=execution_attempt,
),
"image": self.get_image(),
"command": command,
Expand Down
1 change: 1 addition & 0 deletions runner/src/unstract/runner/clients/interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ def get_container_run_config(
run_id: str,
envs: Optional[dict[str, Any]] = None,
auto_remove: bool = False,
execution_attempt: int = 1,
) -> dict[str, Any]:
"""Generate the configuration dictionary to run the container.
Expand Down
2 changes: 2 additions & 0 deletions runner/src/unstract/runner/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ def run_container() -> Optional[Any]:
settings = data["settings"]
envs = data["envs"]
messaging_channel = data["messaging_channel"]
execution_attempt = data.get("execution_attempt", 1)

runner = UnstractRunner(image_name, image_tag, app)
result = runner.run_container(
Expand All @@ -42,6 +43,7 @@ def run_container() -> Optional[Any]:
settings=settings,
envs=envs,
messaging_channel=messaging_channel,
execution_attempt=execution_attempt,
)
return result

Expand Down
4 changes: 4 additions & 0 deletions runner/src/unstract/runner/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,7 @@ def run_command(self, command: str) -> Optional[Any]:
execution_id="",
run_id="",
auto_remove=True,
execution_attempt=1,
)
container = None

Expand All @@ -172,6 +173,7 @@ def run_container(
run_id: str,
settings: dict[str, Any],
envs: dict[str, Any],
execution_attempt: int,
messaging_channel: Optional[str] = None,
) -> Optional[Any]:
"""RUN container With RUN Command.
Expand All @@ -181,6 +183,7 @@ def run_container(
params (dict[str, Any]): params to run the tool
settings (dict[str, Any]): Tool settings
envs (dict[str, Any]): Tool env
execution_attempt (int): The current execution attempt number.
messaging_channel (Optional[str], optional): socket io channel
Returns:
Expand Down Expand Up @@ -214,6 +217,7 @@ def run_container(
execution_id=execution_id,
run_id=run_id,
envs=envs,
execution_attempt=execution_attempt,
)
# Add labels to container for logging with Loki.
# This only required for observability.
Expand Down
5 changes: 3 additions & 2 deletions unstract/core/src/unstract/core/utilities.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,10 @@ def get_env(env_key: str, default: Optional[str] = None, raise_err=False) -> str

@staticmethod
def build_tool_container_name(
tool_image: str, tool_version: str, run_id: str
tool_image: str, tool_version: str, run_id: str, execution_attempt: int
) -> str:
container_name = f"{tool_image.split('/')[-1]}-{tool_version}-{run_id}"
tool_name = tool_image.split("/")[-1]
container_name = f"{tool_name}-{tool_version}-{execution_attempt}-{run_id}"

# To support limits of container clients like K8s
if len(container_name) > 63:
Expand Down
4 changes: 4 additions & 0 deletions unstract/tool-sandbox/src/unstract/tool_sandbox/helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ def call_tool_handler(
image_name: str,
image_tag: str,
settings: dict[str, Any],
execution_attempt: int,
) -> Optional[dict[str, Any]]:
"""Calling unstract runner to run the required tool.
Expand All @@ -91,6 +92,7 @@ def call_tool_handler(
image_name,
image_tag,
settings,
execution_attempt,
)

response = requests.post(url, json=data)
Expand All @@ -114,6 +116,7 @@ def create_tool_request_data(
image_name: str,
image_tag: str,
settings: dict[str, Any],
execution_attempt: int,
) -> dict[str, Any]:
data = {
"image_name": image_name,
Expand All @@ -125,5 +128,6 @@ def create_tool_request_data(
"settings": settings,
"envs": self.envs,
"messaging_channel": self.messaging_channel,
"execution_attempt": execution_attempt,
}
return data
12 changes: 11 additions & 1 deletion unstract/tool-sandbox/src/unstract/tool_sandbox/tool_sandbox.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,10 +93,20 @@ def get_variables(self) -> Optional[dict[str, Any]]:
)
return result

def run_tool(self, run_id: str) -> Optional[dict[str, Any]]:
def run_tool(self, run_id: str, execution_attempt: int) -> Optional[dict[str, Any]]:
"""Run tool by handler
Args:
run_id (str): Run_id for the execution
execution_attempt (int): Current execution attempt
Returns:
Optional[dict[str, Any]]: _description_
"""
return self.helper.call_tool_handler( # type: ignore
run_id,
self.image_name,
self.image_tag,
self.settings,
execution_attempt,
)
Original file line number Diff line number Diff line change
Expand Up @@ -175,20 +175,24 @@ def check_tools_are_available(self, tool_ids: list[str]) -> bool:
def run_tool(
self,
run_id: str,
execution_attempt: int,
tool_sandbox: ToolSandbox,
) -> Any:
return self.run_tool_with_retry(run_id, tool_sandbox)
return self.run_tool_with_retry(run_id, tool_sandbox, execution_attempt)

def run_tool_with_retry(
self,
run_id: str,
tool_sandbox: ToolSandbox,
execution_attempt: int,
max_retries: int = ToolExecution.MAXIMUM_RETRY,
) -> Any:
error: Optional[dict[str, Any]] = None
for retry_count in range(max_retries):
try:
response = tool_sandbox.run_tool(run_id)
response = tool_sandbox.run_tool(
run_id=run_id, execution_attempt=execution_attempt
)
if response:
return response
logger.warning(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,11 @@ def build_workflow(self) -> None:
logger.info(f"Execution {self.execution_id}: Build completed")

def execute_workflow(
self, file_execution_id: str, file_name: str, execution_type: ExecutionType
self,
file_execution_id: str,
file_name: str,
execution_type: ExecutionType,
execution_attempt: int,
) -> None:
"""Executes the complete workflow by running each tools one by one.
Returns the result from final tool in a dictionary.
Expand All @@ -149,6 +153,7 @@ def execute_workflow(
file_execution_id (str): UUID for a single run of a file
file_name (str): Name of the file to process
execution_type (ExecutionType): STEP or COMPLETE
execution_attempt: The current attempt number for this execution
Raises:
BadRequestException: In case someone tries to execute a workflow
Expand All @@ -172,21 +177,22 @@ def execute_workflow(
tool_image=sandbox.image_name,
tool_version=sandbox.image_tag,
run_id=file_execution_id,
execution_attempt=execution_attempt,
)
logger.info(
f"Running execution: '{self.execution_id}', "
f"tool: '{sandbox.image_name}:{sandbox.image_tag}', "
f"file '{file_name}', container: '{container_name}'"
)
self._execute_step(
step=step,
sandbox=sandbox,
step=step, sandbox=sandbox, execution_attempt=execution_attempt
)
self._finalize_execution(execution_type)

def _execute_step(
self,
step: int,
execution_attempt: int,
sandbox: ToolSandbox,
) -> None:
"""Execution of workflow step.
Expand All @@ -195,6 +201,7 @@ def _execute_step(
step (int): workflow step
sandbox (ToolSandbox): instance of tool sandbox
execution_type (ExecutionType): step or complete
execution_attempt: The current attempt number for this execution
last_step_output (list[Any]): output of previous step
Raises:
Expand Down Expand Up @@ -224,7 +231,9 @@ def _execute_step(
component=tool_instance_id,
)
result = self.tool_utils.run_tool(
run_id=self.file_execution_id, tool_sandbox=sandbox
run_id=self.file_execution_id,
tool_sandbox=sandbox,
execution_attempt=execution_attempt,
)
if result and result.get("error"):
raise ToolOutputNotFoundException(result.get("error"))
Expand Down

0 comments on commit 02226d7

Please sign in to comment.