Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FIX] Workflows & API deployments failing for specific cases in remote storage #1088

Merged
merged 5 commits into from
Jan 24, 2025
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 11 additions & 4 deletions backend/workflow_manager/endpoint_v2/destination.py
Original file line number Diff line number Diff line change
Expand Up @@ -237,10 +237,15 @@ def copy_output_to_output_directory(self) -> None:

try:
destination_fs.create_dir_if_not_exists(input_dir=output_directory)

# Traverse local directory and create the same structure in the
# output_directory
for root, dirs, files in os.walk(destination_volume_path):
if check_feature_flag_status(FeatureFlag.REMOTE_FILE_STORAGE):
file_system = FileSystem(FileStorageType.WORKFLOW_EXECUTION)
fs = file_system.get_file_storage()
dir_path = fs.walk(str(destination_volume_path))
else:
dir_path = os.walk(destination_volume_path)
for root, dirs, files in dir_path:
for dir_name in dirs:
current_dir = os.path.join(
output_directory,
Expand Down Expand Up @@ -504,7 +509,8 @@ def delete_execution_directory(self) -> None:
if check_feature_flag_status(FeatureFlag.REMOTE_FILE_STORAGE):
file_system = FileSystem(FileStorageType.WORKFLOW_EXECUTION)
file_storage = file_system.get_file_storage()
file_storage.rm(self.execution_dir, recursive=True)
if file_storage.exists(self.execution_dir):
gaya3-zipstack marked this conversation as resolved.
Show resolved Hide resolved
file_storage.rm(self.execution_dir, recursive=True)
else:
fs: LocalFileSystem = fsspec.filesystem("file")
fs.rm(self.execution_dir, recursive=True)
Expand All @@ -523,7 +529,8 @@ def delete_api_storage_dir(cls, workflow_id: str, execution_id: str) -> None:
if check_feature_flag_status(FeatureFlag.REMOTE_FILE_STORAGE):
file_system = FileSystem(FileStorageType.API_EXECUTION)
file_storage = file_system.get_file_storage()
file_storage.rm(api_storage_dir, recursive=True)
if file_storage.exists(api_storage_dir):
file_storage.rm(api_storage_dir, recursive=True)
else:
fs: LocalFileSystem = fsspec.filesystem("file")
fs.rm(api_storage_dir, recursive=True)
Expand Down
2 changes: 1 addition & 1 deletion tools/structure/src/main.py
gaya3-zipstack marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
Expand Up @@ -385,7 +385,7 @@ def _summarize_and_index(
self.stream_log("Indexing summarized context")
if self.workflow_filestorage:
summarize_file_hash: str = self.workflow_filestorage.get_hash_from_file(
file_path=summarize_file_path
path=summarize_file_path
)
else:
summarize_file_hash: str = ToolUtils.get_hash_from_file(
Expand Down
gaya3-zipstack marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,13 @@
import azure.core.exceptions as AzureException
from adlfs import AzureBlobFileSystem

from backend.constants import FeatureFlag
from unstract.connectors.exceptions import AzureHttpError, ConnectorError
from unstract.connectors.filesystems.unstract_file_system import UnstractFileSystem
from unstract.flags.feature_flag import check_feature_flag_status

if check_feature_flag_status(FeatureFlag.REMOTE_FILE_STORAGE):
from unstract.filesystem import FileStorageType, FileSystem

logging.getLogger("azurefs").setLevel(logging.ERROR)
logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -90,10 +95,16 @@ def upload_file_to_storage(self, source_path: str, destination_path: str) -> Non
AzureHttpError: returns error for invalid directory
"""
normalized_path = os.path.normpath(destination_path)
fs = self.get_fsspec_fs()
destination_connector_fs = self.get_fsspec_fs()
try:
with open(source_path, "rb") as source_file:
fs.write_bytes(normalized_path, source_file.read())
if check_feature_flag_status(FeatureFlag.REMOTE_FILE_STORAGE):
file_system = FileSystem(FileStorageType.WORKFLOW_EXECUTION)
workflow_fs = file_system.get_file_storage()
data = workflow_fs.read(path=source_path, mode="rb")
else:
with open(source_path, "rb") as source_file:
data = source_file.read()
destination_connector_fs.write_bytes(normalized_path, data)
gaya3-zipstack marked this conversation as resolved.
Show resolved Hide resolved
except AzureException.HttpResponseError as e:
self.raise_http_exception(e=e, path=normalized_path)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,15 @@

from fsspec import AbstractFileSystem

from backend.constants import FeatureFlag
from unstract.connectors.base import UnstractConnector
from unstract.connectors.enums import ConnectorMode
from unstract.flags.feature_flag import check_feature_flag_status

if check_feature_flag_status(FeatureFlag.REMOTE_FILE_STORAGE):
from unstract.filesystem import FileStorageType, FileSystem

logger = logging.getLogger(__name__)


class UnstractFileSystem(UnstractConnector, ABC):
Expand Down Expand Up @@ -98,6 +105,12 @@ def upload_file_to_storage(self, source_path: str, destination_path: str) -> Non
uploaded
"""
normalized_path = os.path.normpath(destination_path)
fs = self.get_fsspec_fs()
with open(source_path, "rb") as source_file:
fs.write_bytes(normalized_path, source_file.read())
destination_connector_fs = self.get_fsspec_fs()
if check_feature_flag_status(FeatureFlag.REMOTE_FILE_STORAGE):
file_system = FileSystem(FileStorageType.WORKFLOW_EXECUTION)
workflow_fs = file_system.get_file_storage()
data = workflow_fs.read(path=source_path, mode="rb")
else:
with open(source_path, "rb") as source_file:
data = source_file.read()
destination_connector_fs.write_bytes(normalized_path, data)
Loading