Skip to content

Commit

Permalink
[FIX] Workflows & API deployments failing for specific cases in remot…
Browse files Browse the repository at this point in the history
…e storage (#1088)

* FIx auth middleware and pdm.lock issue for filesystem

* Fixes for writing to destination connector
  • Loading branch information
gaya3-zipstack authored Jan 24, 2025
1 parent 01f0e65 commit dd6bbe9
Show file tree
Hide file tree
Showing 4 changed files with 42 additions and 11 deletions.
15 changes: 11 additions & 4 deletions backend/workflow_manager/endpoint_v2/destination.py
Original file line number Diff line number Diff line change
Expand Up @@ -237,10 +237,15 @@ def copy_output_to_output_directory(self) -> None:

try:
destination_fs.create_dir_if_not_exists(input_dir=output_directory)

# Traverse local directory and create the same structure in the
# output_directory
for root, dirs, files in os.walk(destination_volume_path):
if check_feature_flag_status(FeatureFlag.REMOTE_FILE_STORAGE):
file_system = FileSystem(FileStorageType.WORKFLOW_EXECUTION)
fs = file_system.get_file_storage()
dir_path = fs.walk(str(destination_volume_path))
else:
dir_path = os.walk(destination_volume_path)
for root, dirs, files in dir_path:
for dir_name in dirs:
current_dir = os.path.join(
output_directory,
Expand Down Expand Up @@ -504,7 +509,8 @@ def delete_execution_directory(self) -> None:
if check_feature_flag_status(FeatureFlag.REMOTE_FILE_STORAGE):
file_system = FileSystem(FileStorageType.WORKFLOW_EXECUTION)
file_storage = file_system.get_file_storage()
file_storage.rm(self.execution_dir, recursive=True)
if file_storage.exists(self.execution_dir):
file_storage.rm(self.execution_dir, recursive=True)
else:
fs: LocalFileSystem = fsspec.filesystem("file")
fs.rm(self.execution_dir, recursive=True)
Expand All @@ -523,7 +529,8 @@ def delete_api_storage_dir(cls, workflow_id: str, execution_id: str) -> None:
if check_feature_flag_status(FeatureFlag.REMOTE_FILE_STORAGE):
file_system = FileSystem(FileStorageType.API_EXECUTION)
file_storage = file_system.get_file_storage()
file_storage.rm(api_storage_dir, recursive=True)
if file_storage.exists(api_storage_dir):
file_storage.rm(api_storage_dir, recursive=True)
else:
fs: LocalFileSystem = fsspec.filesystem("file")
fs.rm(api_storage_dir, recursive=True)
Expand Down
2 changes: 1 addition & 1 deletion tools/structure/src/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -385,7 +385,7 @@ def _summarize_and_index(
self.stream_log("Indexing summarized context")
if self.workflow_filestorage:
summarize_file_hash: str = self.workflow_filestorage.get_hash_from_file(
file_path=summarize_file_path
path=summarize_file_path
)
else:
summarize_file_hash: str = ToolUtils.get_hash_from_file(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,13 @@
import azure.core.exceptions as AzureException
from adlfs import AzureBlobFileSystem

from backend.constants import FeatureFlag
from unstract.connectors.exceptions import AzureHttpError, ConnectorError
from unstract.connectors.filesystems.unstract_file_system import UnstractFileSystem
from unstract.flags.feature_flag import check_feature_flag_status

if check_feature_flag_status(FeatureFlag.REMOTE_FILE_STORAGE):
from unstract.filesystem import FileStorageType, FileSystem

logging.getLogger("azurefs").setLevel(logging.ERROR)
logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -90,10 +95,16 @@ def upload_file_to_storage(self, source_path: str, destination_path: str) -> Non
AzureHttpError: returns error for invalid directory
"""
normalized_path = os.path.normpath(destination_path)
fs = self.get_fsspec_fs()
destination_connector_fs = self.get_fsspec_fs()
try:
with open(source_path, "rb") as source_file:
fs.write_bytes(normalized_path, source_file.read())
if check_feature_flag_status(FeatureFlag.REMOTE_FILE_STORAGE):
file_system = FileSystem(FileStorageType.WORKFLOW_EXECUTION)
workflow_fs = file_system.get_file_storage()
data = workflow_fs.read(path=source_path, mode="rb")
else:
with open(source_path, "rb") as source_file:
data = source_file.read()
destination_connector_fs.write_bytes(normalized_path, data)
except AzureException.HttpResponseError as e:
self.raise_http_exception(e=e, path=normalized_path)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,15 @@

from fsspec import AbstractFileSystem

from backend.constants import FeatureFlag
from unstract.connectors.base import UnstractConnector
from unstract.connectors.enums import ConnectorMode
from unstract.flags.feature_flag import check_feature_flag_status

if check_feature_flag_status(FeatureFlag.REMOTE_FILE_STORAGE):
from unstract.filesystem import FileStorageType, FileSystem

logger = logging.getLogger(__name__)


class UnstractFileSystem(UnstractConnector, ABC):
Expand Down Expand Up @@ -98,6 +105,12 @@ def upload_file_to_storage(self, source_path: str, destination_path: str) -> Non
uploaded
"""
normalized_path = os.path.normpath(destination_path)
fs = self.get_fsspec_fs()
with open(source_path, "rb") as source_file:
fs.write_bytes(normalized_path, source_file.read())
destination_connector_fs = self.get_fsspec_fs()
if check_feature_flag_status(FeatureFlag.REMOTE_FILE_STORAGE):
file_system = FileSystem(FileStorageType.WORKFLOW_EXECUTION)
workflow_fs = file_system.get_file_storage()
data = workflow_fs.read(path=source_path, mode="rb")
else:
with open(source_path, "rb") as source_file:
data = source_file.read()
destination_connector_fs.write_bytes(normalized_path, data)

0 comments on commit dd6bbe9

Please sign in to comment.