Skip to content

Commit

Permalink
handled fetching result from filestorage
Browse files Browse the repository at this point in the history
  • Loading branch information
muhammad-ali-e committed Nov 15, 2024
1 parent bc3f278 commit 66689f1
Show file tree
Hide file tree
Showing 3 changed files with 52 additions and 5 deletions.
40 changes: 40 additions & 0 deletions backend/workflow_manager/endpoint_v2/destination.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from connector_v2.models import ConnectorInstance
from fsspec.implementations.local import LocalFileSystem
from unstract.sdk.constants import ToolExecKey
from unstract.sdk.tool.mime_types import EXT_MIME_MAP
from unstract.workflow_execution.constants import ToolOutputType
from utils.constants import FeatureFlag
from utils.user_context import UserContext
Expand Down Expand Up @@ -411,6 +412,8 @@ def get_result(self, file_history: Optional[FileHistory] = None) -> Optional[Any
Returns:
Union[dict[str, Any], str]: Result data.
"""
if check_feature_flag_status(FeatureFlag.REMOTE_FILE_STORAGE):
return self.get_result_with_file_storage(file_history=file_history)
if file_history and file_history.result:
return self.parse_string(file_history.result)
output_file = os.path.join(self.execution_dir, WorkflowFileType.INFILE)
Expand Down Expand Up @@ -440,6 +443,43 @@ def get_result(self, file_history: Optional[FileHistory] = None) -> Optional[Any
logger.error(f"Error while getting result {err}")
return result

def get_result_with_file_storage(
self, file_history: Optional[FileHistory] = None
) -> Optional[Any]:
"""Get result data from the output file.
Returns:
Union[dict[str, Any], str]: Result data.
"""
if file_history and file_history.result:
return self.parse_string(file_history.result)
output_file = os.path.join(self.execution_dir, WorkflowFileType.INFILE)
metadata: dict[str, Any] = self.get_workflow_metadata()
output_type = self.get_output_type(metadata)
result: Union[dict[str, Any], str] = ""
file_system = FileSystem(FileStorageType.WORKFLOW_EXECUTION)
file_storage = file_system.get_file_storage()
try:
# TODO: SDK handles validation; consider removing here.
file_type = file_storage.mime_type(output_file)
if output_type == ToolOutputType.JSON:
if file_type != EXT_MIME_MAP[ToolOutputType.JSON.lower()]:
logger.error(f"Output type json mismatched {file_type}")
raise ToolOutputTypeMismatch()
file_content = file_storage.read(output_file, mode="r")
result = json.loads(file_content)
elif output_type == ToolOutputType.TXT:
if file_type == EXT_MIME_MAP[ToolOutputType.JSON.lower()]:
logger.error(f"Output type txt mismatched {file_type}")
raise ToolOutputTypeMismatch()
file_content = file_storage.read(output_file, mode="r")
result = file_content.encode("utf-8").decode("unicode-escape")
else:
raise InvalidToolOutputType()
except (FileNotFoundError, json.JSONDecodeError) as err:
logger.error(f"Error while getting result {err}")
return result

def get_metadata(
self, file_history: Optional[FileHistory] = None
) -> Optional[dict[str, Any]]:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ class ToolRuntimeVariable:
ADAPTER_LLMW_POLL_INTERVAL = "ADAPTER_LLMW_POLL_INTERVAL"
ADAPTER_LLMW_MAX_POLLS = "ADAPTER_LLMW_MAX_POLLS"
EXECUTION_BY_TOOL = "EXECUTION_BY_TOOL"
WORKFLOW_EXECUTION_DIR_PREFIX = "WORKFLOW_EXECUTION_DIR_PREFIX"
API_EXECUTION_DIR_PREFIX = "API_EXECUTION_DIR_PREFIX"


class WorkflowFileType:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,11 @@
MetaDataKey,
ToolMetadataKey,
ToolOutputType,
ToolRuntimeVariable,
WorkflowFileType,
)
from unstract.workflow_execution.exceptions import ToolMetadataNotFound
from unstract.workflow_execution.tools_utils import ToolsUtils

from unstract.flags.feature_flag import check_feature_flag_status

Expand Down Expand Up @@ -183,9 +185,12 @@ def get_execution_dir(
Returns:
str: The directory path for the execution.
"""
execution_dir = f"/execution/{organization_id}/{workflow_id}/{execution_id}"
path_prefix = ToolsUtils.get_env(
ToolRuntimeVariable.WORKFLOW_EXECUTION_DIR_PREFIX
)
execution_dir = Path(path_prefix) / organization_id / workflow_id / execution_id

return execution_dir
return str(execution_dir)

@classmethod
def get_api_execution_dir(
Expand All @@ -202,6 +207,6 @@ def get_api_execution_dir(
Returns:
str: The directory path for the execution.
"""

execution_dir = f"/api/{organization_id}/{workflow_id}/{execution_id}"
return execution_dir
path_prefix = ToolsUtils.get_env(ToolRuntimeVariable.API_EXECUTION_DIR_PREFIX)
execution_dir = Path(path_prefix) / organization_id / workflow_id / execution_id
return str(execution_dir)

0 comments on commit 66689f1

Please sign in to comment.