From 265d5b961dccef55d63abca414763e1d4036473c Mon Sep 17 00:00:00 2001 From: Gayathri <142381512+gaya3-zipstack@users.noreply.github.com> Date: Tue, 3 Dec 2024 14:23:41 +0530 Subject: [PATCH] Handle file not found using cache invalidation and retry again (#130) * contextSizeChanges * contextSizeChanges * Version roll and test folder check in * Fix enum values * Fix test cases, address review comments * Address review comments * Update pyproject.toml Co-authored-by: Chandrasekharan M <117059509+chandrasekharan-zipstack@users.noreply.github.com> Signed-off-by: Gayathri <142381512+gaya3-zipstack@users.noreply.github.com> * Address mypy issues * Change class design and implementation * Remove unused definitions * Add cp() and function refactoring * Check-in sample env * Default value of dict changed to None * Add size() * Refctor for using FileStorage * Refactor to use FileStorage * Fix issues * Add mim_type, download functions * change comments * Refactor het_hash_from_file * Add return types * Remove permanent file storage from sdk * Fix SDK functional issues * Support minio * Test cases for Minio * Bring file variants back to sdk * Fix copy_on_write * Add new test cases for uploadd/download * Add new functions to support platform-service * Change modififcation_time return type to datetime * Refactor env pick-up logic * Sample env * contextSizeChanges * Remove commented code and some improvisations * contextSizeChanges * Add right JSON formatted string * Update src/unstract/sdk/file_storage/fs_permanent.py Co-authored-by: Chandrasekharan M <117059509+chandrasekharan-zipstack@users.noreply.github.com> Signed-off-by: Gayathri <142381512+gaya3-zipstack@users.noreply.github.com> * Address review comments * Address review comments * Update src/unstract/sdk/file_storage/fs_shared_temporary.py Co-authored-by: ali <117142933+muhammad-ali-e@users.noreply.github.com> Signed-off-by: Gayathri <142381512+gaya3-zipstack@users.noreply.github.com> * Refactor for change in enum value * Add return type * Support glob * Add function to interface * Update env format * Support legacy storage and get_hash_from_file * Change sample path * Update test env * Add yaml_dump function * add more functions * Type the args * Add file not found exception * Optimise checks * Setup python version * Handle file not found using cache invalidation and retry * Revert a change * Renaming * Add env helper for * Add sample env * Update src/unstract/sdk/file_storage/env_helper.py Co-authored-by: Chandrasekharan M <117059509+chandrasekharan-zipstack@users.noreply.github.com> Signed-off-by: Gayathri <142381512+gaya3-zipstack@users.noreply.github.com> * Review comments - Address --------- Signed-off-by: Gayathri <142381512+gaya3-zipstack@users.noreply.github.com> Co-authored-by: Chandrasekharan M <117059509+chandrasekharan-zipstack@users.noreply.github.com> Co-authored-by: ali <117142933+muhammad-ali-e@users.noreply.github.com> --- src/unstract/sdk/file_storage/__init__.py | 8 +- src/unstract/sdk/file_storage/constants.py | 13 ++ src/unstract/sdk/file_storage/env_helper.py | 37 +++++ src/unstract/sdk/file_storage/helper.py | 25 ++- .../sdk/file_storage/{fs_impl.py => impl.py} | 157 ++++++------------ .../{fs_interface.py => interface.py} | 0 .../{fs_permanent.py => permanent.py} | 4 +- .../{fs_provider.py => provider.py} | 0 ...hared_temporary.py => shared_temporary.py} | 0 tests/sample.env | 3 + tests/test_file_storage.py | 42 ++++- 11 files changed, 175 insertions(+), 114 deletions(-) create mode 100644 src/unstract/sdk/file_storage/env_helper.py rename src/unstract/sdk/file_storage/{fs_impl.py => impl.py} (69%) rename src/unstract/sdk/file_storage/{fs_interface.py => interface.py} (100%) rename src/unstract/sdk/file_storage/{fs_permanent.py => permanent.py} (96%) rename src/unstract/sdk/file_storage/{fs_provider.py => provider.py} (100%) rename src/unstract/sdk/file_storage/{fs_shared_temporary.py => shared_temporary.py} (100%) diff --git a/src/unstract/sdk/file_storage/__init__.py b/src/unstract/sdk/file_storage/__init__.py index 1a3e6cf9..6c20692b 100644 --- a/src/unstract/sdk/file_storage/__init__.py +++ b/src/unstract/sdk/file_storage/__init__.py @@ -6,8 +6,8 @@ "SharedTemporaryFileStorage", ] -from unstract.sdk.file_storage.fs_impl import FileStorage -from unstract.sdk.file_storage.fs_permanent import PermanentFileStorage -from unstract.sdk.file_storage.fs_provider import FileStorageProvider -from unstract.sdk.file_storage.fs_shared_temporary import SharedTemporaryFileStorage from unstract.sdk.file_storage.helper import FileStorageHelper +from unstract.sdk.file_storage.impl import FileStorage +from unstract.sdk.file_storage.permanent import PermanentFileStorage +from unstract.sdk.file_storage.provider import FileStorageProvider +from unstract.sdk.file_storage.shared_temporary import SharedTemporaryFileStorage diff --git a/src/unstract/sdk/file_storage/constants.py b/src/unstract/sdk/file_storage/constants.py index b93b156a..8a829773 100644 --- a/src/unstract/sdk/file_storage/constants.py +++ b/src/unstract/sdk/file_storage/constants.py @@ -1,3 +1,6 @@ +from enum import Enum + + class FileOperationParams: READ_ENTIRE_LENGTH = -1 DEFAULT_ENCODING = "utf-8" @@ -7,3 +10,13 @@ class FileSeekPosition: START = 0 CURRENT = 1 END = 2 + + +class StorageType(Enum): + PERMANENT = "permanent" + TEMPORARY = "temporary" + + +class CredentialKeyword: + PROVIDER = "provider" + CREDENTIALS = "credentials" diff --git a/src/unstract/sdk/file_storage/env_helper.py b/src/unstract/sdk/file_storage/env_helper.py new file mode 100644 index 00000000..d8a1de2c --- /dev/null +++ b/src/unstract/sdk/file_storage/env_helper.py @@ -0,0 +1,37 @@ +import json +import logging +import os + +from unstract.sdk.exceptions import FileStorageError +from unstract.sdk.file_storage.constants import CredentialKeyword, StorageType +from unstract.sdk.file_storage.impl import FileStorage +from unstract.sdk.file_storage.permanent import PermanentFileStorage +from unstract.sdk.file_storage.provider import FileStorageProvider +from unstract.sdk.file_storage.shared_temporary import SharedTemporaryFileStorage + +logger = logging.getLogger(__name__) + + +class EnvHelper: + @staticmethod + def get_storage(storage_type: StorageType, env_name: str) -> FileStorage: + try: + file_storage_creds = json.loads(os.environ.get(env_name)) + provider = FileStorageProvider( + file_storage_creds[CredentialKeyword.PROVIDER] + ) + credentials = file_storage_creds.get(CredentialKeyword.CREDENTIALS, {}) + if storage_type == StorageType.PERMANENT.value: + file_storage = PermanentFileStorage(provider=provider, **credentials) + elif storage_type == StorageType.TEMPORARY.value: + file_storage = SharedTemporaryFileStorage( + provider=provider, **credentials + ) + else: + raise NotImplementedError() + return file_storage + except KeyError as e: + logger.error(f"Required credentials is missing in the env: {str(e)}") + raise e + except FileStorageError as e: + raise e diff --git a/src/unstract/sdk/file_storage/helper.py b/src/unstract/sdk/file_storage/helper.py index b23b2bda..911edddc 100644 --- a/src/unstract/sdk/file_storage/helper.py +++ b/src/unstract/sdk/file_storage/helper.py @@ -4,8 +4,8 @@ import fsspec from fsspec import AbstractFileSystem -from unstract.sdk.exceptions import FileStorageError -from unstract.sdk.file_storage.fs_provider import FileStorageProvider +from unstract.sdk.exceptions import FileOperationError, FileStorageError +from unstract.sdk.file_storage.provider import FileStorageProvider logger = logging.getLogger(__name__) @@ -67,3 +67,24 @@ def local_file_system_init() -> AbstractFileSystem: f" file system {e}" ) raise FileStorageError(str(e)) from e + + +def skip_local_cache(func): + def wrapper(*args, **kwargs): + try: + return func(*args, **kwargs) + except FileNotFoundError: + try: + # FileNotFound could have been caused by stale cache. + # Hence invalidate cache and retry again + args[0].fs.invalidate_cache() + return func(*args, **kwargs) + except Exception as e: + if isinstance(e, FileNotFoundError): + raise e + else: + raise FileOperationError(str(e)) from e + except Exception as e: + raise FileOperationError(str(e)) from e + + return wrapper diff --git a/src/unstract/sdk/file_storage/fs_impl.py b/src/unstract/sdk/file_storage/impl.py similarity index 69% rename from src/unstract/sdk/file_storage/fs_impl.py rename to src/unstract/sdk/file_storage/impl.py index d6677aab..e5cade8a 100644 --- a/src/unstract/sdk/file_storage/fs_impl.py +++ b/src/unstract/sdk/file_storage/impl.py @@ -10,9 +10,9 @@ from unstract.sdk.exceptions import FileOperationError from unstract.sdk.file_storage.constants import FileOperationParams, FileSeekPosition -from unstract.sdk.file_storage.fs_interface import FileStorageInterface -from unstract.sdk.file_storage.fs_provider import FileStorageProvider -from unstract.sdk.file_storage.helper import FileStorageHelper +from unstract.sdk.file_storage.helper import FileStorageHelper, skip_local_cache +from unstract.sdk.file_storage.interface import FileStorageInterface +from unstract.sdk.file_storage.provider import FileStorageProvider logger = logging.getLogger(__name__) @@ -21,10 +21,13 @@ class FileStorage(FileStorageInterface): # This class integrates fsspec library for file operations fs: fsspec # fsspec file system handle + provider: FileStorageProvider def __init__(self, provider: FileStorageProvider, **storage_config: dict[str, Any]): self.fs = FileStorageHelper.file_storage_init(provider, **storage_config) + self.provider = provider + @skip_local_cache def read( self, path: str, @@ -47,15 +50,10 @@ def read( Returns: Union[bytes, str] - File contents in bytes/string based on the opened mode """ - try: - with self.fs.open(path=path, mode=mode, encoding=encoding) as file_handle: - if seek_position > 0: - file_handle.seek(seek_position) - return file_handle.read(length) - except FileNotFoundError as e: - raise e - except Exception as e: - raise FileOperationError(str(e)) from e + with self.fs.open(path=path, mode=mode, encoding=encoding) as file_handle: + if seek_position > 0: + file_handle.seek(seek_position) + return file_handle.read(length) def write( self, @@ -84,6 +82,7 @@ def write( except Exception as e: raise FileOperationError(str(e)) from e + @skip_local_cache def seek( self, path: str, @@ -103,13 +102,8 @@ def seek( Returns: int: file pointer location after seeking to the mentioned position """ - try: - with self.fs.open(path=path, mode="rb") as file_handle: - return file_handle.seek(location, position) - except FileNotFoundError as e: - raise e - except Exception as e: - raise FileOperationError(str(e)) from e + with self.fs.open(path=path, mode="rb") as file_handle: + return file_handle.seek(location, position) def mkdir(self, path: str, create_parents: bool = True): """Create a directory. @@ -140,6 +134,7 @@ def exists(self, path: str) -> bool: except Exception as e: raise FileOperationError(str(e)) from e + @skip_local_cache def ls(self, path: str) -> list[str]: """List the directory path. @@ -149,13 +144,9 @@ def ls(self, path: str) -> list[str]: Returns: List[str]: List of files / directories under the path """ - try: - return self.fs.ls(path) - except FileNotFoundError as e: - raise e - except Exception as e: - raise FileOperationError(str(e)) from e + return self.fs.ls(path) + @skip_local_cache def rm(self, path: str, recursive: bool = True): """Removes a file or directory mentioned in path. @@ -167,14 +158,9 @@ def rm(self, path: str, recursive: bool = True): Returns: NA """ - try: - return self.fs.rm(path=path, recursive=recursive) - except FileNotFoundError as e: - logger.debug(f"Path {path} does not exist.") - raise e - except Exception as e: - raise FileOperationError(str(e)) from e + return self.fs.rm(path=path, recursive=recursive) + @skip_local_cache def cp(self, src: str, dest: str, overwrite: bool = True): """Copies files from source(lpath) path to the destination(rpath) path. @@ -185,13 +171,9 @@ def cp(self, src: str, dest: str, overwrite: bool = True): Returns: NA """ - try: - return self.fs.cp(src, dest, overwrite=overwrite) - except FileNotFoundError as e: - raise e - except Exception as e: - raise FileOperationError(str(e)) from e + return self.fs.cp(src, dest, overwrite=overwrite) + @skip_local_cache def size(self, path: str) -> int: """Get the size of the file specified in path. @@ -201,14 +183,10 @@ def size(self, path: str) -> int: Returns: int: Size of the file in bytes """ - try: - file_info = self.fs.info(path) - return file_info["size"] - except FileNotFoundError as e: - raise e - except Exception as e: - raise FileOperationError(str(e)) from e + file_info = self.fs.info(path) + return file_info["size"] + @skip_local_cache def modification_time(self, path: str) -> datetime: """Get the last modification time of the file specified in path. @@ -218,17 +196,13 @@ def modification_time(self, path: str) -> datetime: Returns: datetime: Last modified time in datetime """ - try: - file_info = self.fs.info(path) - file_mtime = file_info["mtime"] - if not isinstance(file_mtime, datetime): - file_mtime = datetime.fromtimestamp(file_mtime) - return file_mtime - except FileNotFoundError as e: - raise e - except Exception as e: - raise FileOperationError(str(e)) from e + file_info = self.fs.info(path) + file_mtime = file_info["mtime"] + if not isinstance(file_mtime, datetime): + file_mtime = datetime.fromtimestamp(file_mtime) + return file_mtime + @skip_local_cache def mime_type(self, path: str) -> str: """Gets the file MIME type for an input file. Uses libmagic to perform the same. @@ -239,15 +213,11 @@ def mime_type(self, path: str) -> str: Returns: str: MIME type of the file """ - try: - sample_contents = self.read(path=path, mode="rb", length=100) - mime_type = magic.from_buffer(sample_contents, mime=True) - return mime_type - except FileNotFoundError as e: - raise e - except Exception as e: - raise FileOperationError(str(e)) from e + sample_contents = self.read(path=path, mode="rb", length=100) + mime_type = magic.from_buffer(sample_contents, mime=True) + return mime_type + @skip_local_cache def download(self, from_path: str, to_path: str): """Downloads the file mentioned in from_path to to_path on the local system. The instance calling the method needs to be the FileStorage @@ -261,14 +231,9 @@ def download(self, from_path: str, to_path: str): Returns: NA """ - try: - self.fs.get(rpath=from_path, lpath=to_path) - except FileNotFoundError as e: - logger.error(f"Path {from_path} does not exist.") - raise e - except Exception as e: - raise FileOperationError(str(e)) from e + self.fs.get(rpath=from_path, lpath=to_path) + @skip_local_cache def upload(self, from_path: str, to_path: str): """Uploads the file mentioned in from_path (local system) to to_path (remote system). The instance calling the method needs to be the @@ -282,13 +247,7 @@ def upload(self, from_path: str, to_path: str): Returns: NA """ - try: - self.fs.put(from_path, to_path) - except FileNotFoundError as e: - logger.error(f"Path {from_path} does not exist.") - raise e - except Exception as e: - raise FileOperationError(str(e)) from e + self.fs.put(from_path, to_path) def glob(self, path: str) -> list[str]: """Lists files under path matching the pattern sepcified as part of @@ -307,6 +266,7 @@ def glob(self, path: str) -> list[str]: except Exception as e: raise FileOperationError(str(e)) from e + @skip_local_cache def get_hash_from_file(self, path: str) -> str: """Computes the hash for a file. @@ -319,18 +279,13 @@ def get_hash_from_file(self, path: str) -> str: str: SHA256 hash of the file """ - try: - h = sha256() - b = bytearray(128 * 1024) - mv = memoryview(b) - with self.fs.open(path) as f: - while n := f.readinto(mv): - h.update(mv[:n]) - return str(h.hexdigest()) - except FileNotFoundError as e: - raise e - except Exception as e: - raise FileOperationError(str(e)) from e + h = sha256() + b = bytearray(128 * 1024) + mv = memoryview(b) + with self.fs.open(path) as f: + while n := f.readinto(mv): + h.update(mv[:n]) + return str(h.hexdigest()) def json_dump( self, @@ -370,16 +325,13 @@ def yaml_dump( except Exception as e: raise FileOperationError(str(e)) from e + @skip_local_cache def json_load(self, path: str) -> dict[Any, Any]: - try: - with self.fs.open(path=path) as json_file: - data: dict[str, Any] = json.load(json_file) - return data - except FileNotFoundError as e: - raise e - except Exception as e: - raise FileOperationError(str(e)) from e + with self.fs.open(path=path) as json_file: + data: dict[str, Any] = json.load(json_file) + return data + @skip_local_cache def yaml_load( self, path: str, @@ -392,11 +344,6 @@ def yaml_load( Returns: dict[Any, Any]: Data loaded as yaml """ - try: - with self.fs.open(path=path) as f: - data: dict[str, Any] = yaml.safe_load(f) - return data - except FileNotFoundError as e: - raise e - except Exception as e: - raise FileOperationError(str(e)) from e + with self.fs.open(path=path) as f: + data: dict[str, Any] = yaml.safe_load(f) + return data diff --git a/src/unstract/sdk/file_storage/fs_interface.py b/src/unstract/sdk/file_storage/interface.py similarity index 100% rename from src/unstract/sdk/file_storage/fs_interface.py rename to src/unstract/sdk/file_storage/interface.py diff --git a/src/unstract/sdk/file_storage/fs_permanent.py b/src/unstract/sdk/file_storage/permanent.py similarity index 96% rename from src/unstract/sdk/file_storage/fs_permanent.py rename to src/unstract/sdk/file_storage/permanent.py index 72226f89..588889fb 100644 --- a/src/unstract/sdk/file_storage/fs_permanent.py +++ b/src/unstract/sdk/file_storage/permanent.py @@ -3,8 +3,8 @@ from unstract.sdk.exceptions import FileOperationError, FileStorageError from unstract.sdk.file_storage.constants import FileOperationParams -from unstract.sdk.file_storage.fs_impl import FileStorage -from unstract.sdk.file_storage.fs_provider import FileStorageProvider +from unstract.sdk.file_storage.impl import FileStorage +from unstract.sdk.file_storage.provider import FileStorageProvider logger = logging.getLogger(__name__) diff --git a/src/unstract/sdk/file_storage/fs_provider.py b/src/unstract/sdk/file_storage/provider.py similarity index 100% rename from src/unstract/sdk/file_storage/fs_provider.py rename to src/unstract/sdk/file_storage/provider.py diff --git a/src/unstract/sdk/file_storage/fs_shared_temporary.py b/src/unstract/sdk/file_storage/shared_temporary.py similarity index 100% rename from src/unstract/sdk/file_storage/fs_shared_temporary.py rename to src/unstract/sdk/file_storage/shared_temporary.py diff --git a/tests/sample.env b/tests/sample.env index cfce273e..d6078904 100644 --- a/tests/sample.env +++ b/tests/sample.env @@ -24,3 +24,6 @@ TEXT_CONTENT=Writing directly from string as read_file is not passed FILE_STORAGE_GCS='{"token": "/path/to/google/creds.json"}' FILE_STORAGE_MINIO='{"endpoint_url": "http://localhost:9000","key": "xxxx", "secret": "xxxx"}' FILE_STORAGE_LOCAL='{"auto_mkdir": True}' + +TEST_PERMANENT_STORAGE='{"provider": "gcs", "credentials": {"token": "/path/to/google/creds.json"}}' +TEST_TEMPORARY_STORAGE='{"provider": "minio", "credentials": {"endpoint_url": "http://unstract-minio:9000", "key": "xxxx", "secret": "xxxx"}}' diff --git a/tests/test_file_storage.py b/tests/test_file_storage.py index 50287343..1db63ed3 100644 --- a/tests/test_file_storage.py +++ b/tests/test_file_storage.py @@ -9,7 +9,10 @@ from unstract.sdk.constants import MimeType from unstract.sdk.exceptions import FileOperationError -from unstract.sdk.file_storage import FileStorage, FileStorageProvider +from unstract.sdk.file_storage.constants import StorageType +from unstract.sdk.file_storage.env_helper import EnvHelper +from unstract.sdk.file_storage.impl import FileStorage +from unstract.sdk.file_storage.provider import FileStorageProvider load_dotenv() @@ -90,6 +93,22 @@ def test_file_read(file_storage, path, mode, read_length, expected_read_length): assert len(file_contents) == expected_read_length +@pytest.mark.parametrize( + "file_storage, path, mode, read_length", + [ + ( + file_storage(provider=FileStorageProvider.LOCAL), + "1.txt", + "rb", + -1, + ), + ], +) +def test_file_read_exception(file_storage, path, mode, read_length): + with pytest.raises(FileNotFoundError): + file_storage.read(path=path, mode=mode, length=read_length) + + @pytest.mark.parametrize( "file_storage, read_file_path, read_mode, file_contents, " "write_file_path, write_mode, read_length, expected_write_length", @@ -699,3 +718,24 @@ def test_glob(file_storage, folder_path, expected_result): file_list = file_storage.glob(path=folder_path) print(f"Files: {file_list}") assert len(file_list) == expected_result + + +@pytest.mark.parametrize( + "storage_type, env_name, expected", + [ + ( + StorageType.PERMANENT.value, + "TEST_PERMANENT_STORAGE", + FileStorageProvider.GCS, + ), + ( + StorageType.TEMPORARY.value, + "TEST_TEMPORARY_STORAGE", + FileStorageProvider.MINIO, + ), + ], +) +def test_get_storage(storage_type, env_name, expected): + file_storage = EnvHelper.get_storage(storage_type, env_name) + assert file_storage.provider == expected + print(file_storage)