diff --git a/backend/workflow_manager/endpoint_v2/destination.py b/backend/workflow_manager/endpoint_v2/destination.py index 45047419f..fe8e1dce6 100644 --- a/backend/workflow_manager/endpoint_v2/destination.py +++ b/backend/workflow_manager/endpoint_v2/destination.py @@ -237,10 +237,15 @@ def copy_output_to_output_directory(self) -> None: try: destination_fs.create_dir_if_not_exists(input_dir=output_directory) - # Traverse local directory and create the same structure in the # output_directory - for root, dirs, files in os.walk(destination_volume_path): + if check_feature_flag_status(FeatureFlag.REMOTE_FILE_STORAGE): + file_system = FileSystem(FileStorageType.WORKFLOW_EXECUTION) + fs = file_system.get_file_storage() + dir_path = fs.walk(str(destination_volume_path)) + else: + dir_path = os.walk(destination_volume_path) + for root, dirs, files in dir_path: for dir_name in dirs: current_dir = os.path.join( output_directory, @@ -504,7 +509,8 @@ def delete_execution_directory(self) -> None: if check_feature_flag_status(FeatureFlag.REMOTE_FILE_STORAGE): file_system = FileSystem(FileStorageType.WORKFLOW_EXECUTION) file_storage = file_system.get_file_storage() - file_storage.rm(self.execution_dir, recursive=True) + if file_storage.exists(self.execution_dir): + file_storage.rm(self.execution_dir, recursive=True) else: fs: LocalFileSystem = fsspec.filesystem("file") fs.rm(self.execution_dir, recursive=True) @@ -523,7 +529,8 @@ def delete_api_storage_dir(cls, workflow_id: str, execution_id: str) -> None: if check_feature_flag_status(FeatureFlag.REMOTE_FILE_STORAGE): file_system = FileSystem(FileStorageType.API_EXECUTION) file_storage = file_system.get_file_storage() - file_storage.rm(api_storage_dir, recursive=True) + if file_storage.exists(api_storage_dir): + file_storage.rm(api_storage_dir, recursive=True) else: fs: LocalFileSystem = fsspec.filesystem("file") fs.rm(api_storage_dir, recursive=True) diff --git a/tools/structure/src/main.py b/tools/structure/src/main.py index 09f400f8b..6a6149ef6 100644 --- a/tools/structure/src/main.py +++ b/tools/structure/src/main.py @@ -385,7 +385,7 @@ def _summarize_and_index( self.stream_log("Indexing summarized context") if self.workflow_filestorage: summarize_file_hash: str = self.workflow_filestorage.get_hash_from_file( - file_path=summarize_file_path + path=summarize_file_path ) else: summarize_file_hash: str = ToolUtils.get_hash_from_file( diff --git a/unstract/connectors/src/unstract/connectors/filesystems/azure_cloud_storage/azure_cloud_storage.py b/unstract/connectors/src/unstract/connectors/filesystems/azure_cloud_storage/azure_cloud_storage.py index caa5ca179..c9898343e 100644 --- a/unstract/connectors/src/unstract/connectors/filesystems/azure_cloud_storage/azure_cloud_storage.py +++ b/unstract/connectors/src/unstract/connectors/filesystems/azure_cloud_storage/azure_cloud_storage.py @@ -5,8 +5,13 @@ import azure.core.exceptions as AzureException from adlfs import AzureBlobFileSystem +from backend.constants import FeatureFlag from unstract.connectors.exceptions import AzureHttpError, ConnectorError from unstract.connectors.filesystems.unstract_file_system import UnstractFileSystem +from unstract.flags.feature_flag import check_feature_flag_status + +if check_feature_flag_status(FeatureFlag.REMOTE_FILE_STORAGE): + from unstract.filesystem import FileStorageType, FileSystem logging.getLogger("azurefs").setLevel(logging.ERROR) logger = logging.getLogger(__name__) @@ -90,10 +95,16 @@ def upload_file_to_storage(self, source_path: str, destination_path: str) -> Non AzureHttpError: returns error for invalid directory """ normalized_path = os.path.normpath(destination_path) - fs = self.get_fsspec_fs() + destination_connector_fs = self.get_fsspec_fs() try: - with open(source_path, "rb") as source_file: - fs.write_bytes(normalized_path, source_file.read()) + if check_feature_flag_status(FeatureFlag.REMOTE_FILE_STORAGE): + file_system = FileSystem(FileStorageType.WORKFLOW_EXECUTION) + workflow_fs = file_system.get_file_storage() + data = workflow_fs.read(path=source_path, mode="rb") + else: + with open(source_path, "rb") as source_file: + data = source_file.read() + destination_connector_fs.write_bytes(normalized_path, data) except AzureException.HttpResponseError as e: self.raise_http_exception(e=e, path=normalized_path) diff --git a/unstract/connectors/src/unstract/connectors/filesystems/unstract_file_system.py b/unstract/connectors/src/unstract/connectors/filesystems/unstract_file_system.py index 75389c772..51542357d 100644 --- a/unstract/connectors/src/unstract/connectors/filesystems/unstract_file_system.py +++ b/unstract/connectors/src/unstract/connectors/filesystems/unstract_file_system.py @@ -5,8 +5,15 @@ from fsspec import AbstractFileSystem +from backend.constants import FeatureFlag from unstract.connectors.base import UnstractConnector from unstract.connectors.enums import ConnectorMode +from unstract.flags.feature_flag import check_feature_flag_status + +if check_feature_flag_status(FeatureFlag.REMOTE_FILE_STORAGE): + from unstract.filesystem import FileStorageType, FileSystem + +logger = logging.getLogger(__name__) class UnstractFileSystem(UnstractConnector, ABC): @@ -98,6 +105,12 @@ def upload_file_to_storage(self, source_path: str, destination_path: str) -> Non uploaded """ normalized_path = os.path.normpath(destination_path) - fs = self.get_fsspec_fs() - with open(source_path, "rb") as source_file: - fs.write_bytes(normalized_path, source_file.read()) + destination_connector_fs = self.get_fsspec_fs() + if check_feature_flag_status(FeatureFlag.REMOTE_FILE_STORAGE): + file_system = FileSystem(FileStorageType.WORKFLOW_EXECUTION) + workflow_fs = file_system.get_file_storage() + data = workflow_fs.read(path=source_path, mode="rb") + else: + with open(source_path, "rb") as source_file: + data = source_file.read() + destination_connector_fs.write_bytes(normalized_path, data)