From b1fb4e50be65196214553e55d3552c4b93cfc8ed Mon Sep 17 00:00:00 2001 From: gayathrivijayakumar Date: Tue, 28 Jan 2025 14:16:23 +0530 Subject: [PATCH 1/2] Remove feature flag condition --- .../prompt_studio_core_v2/models.py | 53 +--- .../prompt_ide_base_tool.py | 11 +- .../prompt_studio_helper.py | 289 ++++++------------ .../prompt_studio_core_v2/views.py | 74 +---- .../endpoint_v2/base_connector.py | 20 +- .../endpoint_v2/destination.py | 74 +---- .../workflow_manager/endpoint_v2/source.py | 68 ++--- .../helper/cost_calculation.py | 128 +++----- .../src/unstract/prompt_service/helper.py | 132 ++++---- runner/src/unstract/runner/clients/docker.py | 17 +- runner/src/unstract/runner/runner.py | 26 +- tools/classifier/src/helper.py | 47 +-- tools/structure/src/main.py | 75 ++--- tools/text_extractor/src/main.py | 21 +- .../azure_cloud_storage.py | 16 +- .../filesystems/unstract_file_system.py | 16 +- .../unstract/tool_registry/tool_registry.py | 67 ++-- .../src/unstract/tool_registry/tool_utils.py | 31 +- .../execution_file_handler.py | 40 +-- 19 files changed, 368 insertions(+), 837 deletions(-) diff --git a/backend/prompt_studio/prompt_studio_core_v2/models.py b/backend/prompt_studio/prompt_studio_core_v2/models.py index c4a71cf15..3a6cc9da5 100644 --- a/backend/prompt_studio/prompt_studio_core_v2/models.py +++ b/backend/prompt_studio/prompt_studio_core_v2/models.py @@ -1,5 +1,4 @@ import logging -import shutil import uuid from typing import Any @@ -7,7 +6,6 @@ from adapter_processor_v2.models import AdapterInstance from django.db import models from django.db.models import QuerySet -from file_management.file_management_helper import FileManagerHelper from prompt_studio.prompt_studio_core_v2.constants import DefaultPrompts from unstract.sdk.file_storage.constants import StorageType from unstract.sdk.file_storage.env_helper import EnvHelper @@ -19,9 +17,6 @@ DefaultOrganizationMixin, ) -from backend.constants import FeatureFlag -from unstract.flags.feature_flag import check_feature_flag_status - logger = logging.getLogger(__name__) @@ -140,38 +135,22 @@ class CustomTool(DefaultOrganizationMixin, BaseModel): def delete(self, organization_id=None, *args, **kwargs): # Delete the documents associated with the tool - if not check_feature_flag_status(FeatureFlag.REMOTE_FILE_STORAGE): - file_path = FileManagerHelper.handle_sub_directory_for_tenants( - organization_id, - is_create=False, - user_id=self.created_by.user_id, - tool_id=str(self.tool_id), - ) - if organization_id: - try: - shutil.rmtree(file_path) - except FileNotFoundError: - logger.error(f"The folder {file_path} does not exist.") - except OSError as e: - logger.error(f"Error: {file_path} : {e.strerror}") - # Continue with the deletion of the tool - else: - fs_instance = EnvHelper.get_storage( - storage_type=StorageType.PERMANENT, - env_name=FileStorageKeys.PERMANENT_REMOTE_STORAGE, - ) - file_path = PromptStudioFileHelper.get_or_create_prompt_studio_subdirectory( - organization_id, - is_create=False, - user_id=self.created_by.user_id, - tool_id=str(self.tool_id), - ) - try: - fs_instance.rm(file_path, True) - except FileNotFoundError: - # Supressed to handle cases when the remote - # file is missing or already deleted - pass + fs_instance = EnvHelper.get_storage( + storage_type=StorageType.PERMANENT, + env_name=FileStorageKeys.PERMANENT_REMOTE_STORAGE, + ) + file_path = PromptStudioFileHelper.get_or_create_prompt_studio_subdirectory( + organization_id, + is_create=False, + user_id=self.created_by.user_id, + tool_id=str(self.tool_id), + ) + try: + fs_instance.rm(file_path, True) + except FileNotFoundError: + # Supressed to handle cases when the remote + # file is missing or already deleted + pass super().delete(*args, **kwargs) class Meta: diff --git a/backend/prompt_studio/prompt_studio_core_v2/prompt_ide_base_tool.py b/backend/prompt_studio/prompt_studio_core_v2/prompt_ide_base_tool.py index e72a7ca76..f3ff4bcf7 100644 --- a/backend/prompt_studio/prompt_studio_core_v2/prompt_ide_base_tool.py +++ b/backend/prompt_studio/prompt_studio_core_v2/prompt_ide_base_tool.py @@ -5,9 +5,6 @@ from unstract.sdk.constants import LogLevel from unstract.sdk.tool.stream import StreamMixin -from backend.constants import FeatureFlag -from unstract.flags.feature_flag import check_feature_flag_status - class PromptIdeBaseTool(StreamMixin): def __init__(self, log_level: LogLevel = LogLevel.INFO, org_id: str = "") -> None: @@ -20,11 +17,11 @@ def __init__(self, log_level: LogLevel = LogLevel.INFO, org_id: str = "") -> Non self.log_level = log_level self.org_id = org_id self.workflow_filestorage = None - if check_feature_flag_status(FeatureFlag.REMOTE_FILE_STORAGE): - from unstract.filesystem import FileStorageType, FileSystem - file_system = FileSystem(FileStorageType.WORKFLOW_EXECUTION) - self.workflow_filestorage = file_system.get_file_storage() + from unstract.filesystem import FileStorageType, FileSystem + + file_system = FileSystem(FileStorageType.WORKFLOW_EXECUTION) + self.workflow_filestorage = file_system.get_file_storage() super().__init__(log_level=log_level) diff --git a/backend/prompt_studio/prompt_studio_core_v2/prompt_studio_helper.py b/backend/prompt_studio/prompt_studio_core_v2/prompt_studio_helper.py index 3c6044223..06a2fadbf 100644 --- a/backend/prompt_studio/prompt_studio_core_v2/prompt_studio_helper.py +++ b/backend/prompt_studio/prompt_studio_core_v2/prompt_studio_helper.py @@ -12,7 +12,6 @@ from adapter_processor_v2.models import AdapterInstance from django.conf import settings from django.db.models.manager import BaseManager -from file_management.file_management_helper import FileManagerHelper from prompt_studio.modifier_loader import ModifierConfig from prompt_studio.modifier_loader import load_plugins as load_modifier_plugins from prompt_studio.prompt_profile_manager_v2.models import ProfileManager @@ -60,14 +59,11 @@ from unstract.sdk.file_storage.env_helper import EnvHelper from unstract.sdk.index import Index from unstract.sdk.prompt import PromptTool -from unstract.sdk.utils.tool_utils import ToolUtils from utils.file_storage.constants import FileStorageKeys from utils.file_storage.helpers.prompt_studio_file_helper import PromptStudioFileHelper from utils.local_context import StateStore -from backend.constants import FeatureFlag from unstract.core.pubsub_helper import LogPublisher -from unstract.flags.feature_flag import check_feature_flag_status CHOICES_JSON = "/static/select_choices.json" ERROR_MSG = "User %s doesn't have access to adapter %s" @@ -343,22 +339,12 @@ def index_document( file_path = file_name else: default_profile = ProfileManager.get_default_llm_profile(tool) - if not check_feature_flag_status(FeatureFlag.REMOTE_FILE_STORAGE): - file_path = FileManagerHelper.handle_sub_directory_for_tenants( - org_id, - is_create=False, - user_id=user_id, - tool_id=tool_id, - ) - else: - file_path = ( - PromptStudioFileHelper.get_or_create_prompt_studio_subdirectory( - org_id, - is_create=False, - user_id=user_id, - tool_id=tool_id, - ) - ) + file_path = PromptStudioFileHelper.get_or_create_prompt_studio_subdirectory( + org_id, + is_create=False, + user_id=user_id, + tool_id=tool_id, + ) file_path = str(Path(file_path) / file_name) if not tool: @@ -382,37 +368,24 @@ def index_document( process_text = None if text_processor: process_text = text_processor.process - if not check_feature_flag_status(FeatureFlag.REMOTE_FILE_STORAGE): - doc_id = PromptStudioHelper.dynamic_indexer( - profile_manager=default_profile, - tool_id=tool_id, - file_path=file_path, - org_id=org_id, - document_id=document_id, - is_summary=is_summary, - reindex=True, - run_id=run_id, - user_id=user_id, - process_text=process_text, - ) - else: - fs_instance = EnvHelper.get_storage( - storage_type=StorageType.PERMANENT, - env_name=FileStorageKeys.PERMANENT_REMOTE_STORAGE, - ) - doc_id = PromptStudioHelper.dynamic_indexer( - profile_manager=default_profile, - tool_id=tool_id, - file_path=file_path, - org_id=org_id, - document_id=document_id, - is_summary=is_summary, - reindex=True, - run_id=run_id, - user_id=user_id, - process_text=process_text, - fs=fs_instance, - ) + + fs_instance = EnvHelper.get_storage( + storage_type=StorageType.PERMANENT, + env_name=FileStorageKeys.PERMANENT_REMOTE_STORAGE, + ) + doc_id = PromptStudioHelper.dynamic_indexer( + profile_manager=default_profile, + tool_id=tool_id, + file_path=file_path, + org_id=org_id, + document_id=document_id, + is_summary=is_summary, + reindex=True, + run_id=run_id, + user_id=user_id, + process_text=process_text, + fs=fs_instance, + ) elapsed_time = time.time() - start_time logger.info( @@ -655,40 +628,24 @@ def _execute_prompts_in_single_pass( @staticmethod def _get_document_path(org_id, user_id, tool_id, doc_name): - if not check_feature_flag_status(FeatureFlag.REMOTE_FILE_STORAGE): - doc_path = FileManagerHelper.handle_sub_directory_for_tenants( - org_id=org_id, - user_id=user_id, - tool_id=tool_id, - is_create=False, - ) - else: - doc_path = PromptStudioFileHelper.get_or_create_prompt_studio_subdirectory( - org_id=org_id, - user_id=user_id, - tool_id=tool_id, - is_create=False, - ) + doc_path = PromptStudioFileHelper.get_or_create_prompt_studio_subdirectory( + org_id=org_id, + user_id=user_id, + tool_id=tool_id, + is_create=False, + ) return str(Path(doc_path) / doc_name) @staticmethod def _get_extract_or_summary_document_path( org_id, user_id, tool_id, doc_name, doc_type ) -> str: - if not check_feature_flag_status(FeatureFlag.REMOTE_FILE_STORAGE): - doc_path = FileManagerHelper.handle_sub_directory_for_tenants( - org_id=org_id, - user_id=user_id, - tool_id=tool_id, - is_create=False, - ) - else: - doc_path = PromptStudioFileHelper.get_or_create_prompt_studio_subdirectory( - org_id=org_id, - user_id=user_id, - tool_id=tool_id, - is_create=False, - ) + doc_path = PromptStudioFileHelper.get_or_create_prompt_studio_subdirectory( + org_id=org_id, + user_id=user_id, + tool_id=tool_id, + is_create=False, + ) extracted_doc_name = Path(doc_name).stem + TSPKeys.TXT_EXTENTION return str(Path(doc_path) / doc_type / extracted_doc_name) @@ -789,35 +746,22 @@ def _fetch_response( x2text = str(profile_manager.x2text.id) if not profile_manager: raise DefaultProfileError() - if not check_feature_flag_status(FeatureFlag.REMOTE_FILE_STORAGE): - index_result = PromptStudioHelper.dynamic_indexer( - profile_manager=profile_manager, - file_path=doc_path, - tool_id=str(tool.tool_id), - org_id=org_id, - document_id=document_id, - is_summary=tool.summarize_as_source, - run_id=run_id, - user_id=user_id, - process_text=process_text, - ) - else: - fs_instance = EnvHelper.get_storage( - storage_type=StorageType.PERMANENT, - env_name=FileStorageKeys.PERMANENT_REMOTE_STORAGE, - ) - index_result = PromptStudioHelper.dynamic_indexer( - profile_manager=profile_manager, - file_path=doc_path, - tool_id=str(tool.tool_id), - org_id=org_id, - document_id=document_id, - is_summary=tool.summarize_as_source, - run_id=run_id, - user_id=user_id, - process_text=process_text, - fs=fs_instance, - ) + fs_instance = EnvHelper.get_storage( + storage_type=StorageType.PERMANENT, + env_name=FileStorageKeys.PERMANENT_REMOTE_STORAGE, + ) + index_result = PromptStudioHelper.dynamic_indexer( + profile_manager=profile_manager, + file_path=doc_path, + tool_id=str(tool.tool_id), + org_id=org_id, + document_id=document_id, + is_summary=tool.summarize_as_source, + run_id=run_id, + user_id=user_id, + process_text=process_text, + fs=fs_instance, + ) if index_result.get("status") == IndexingStatus.PENDING_STATUS.value: return { "status": IndexingStatus.PENDING_STATUS.value, @@ -888,10 +832,7 @@ def _fetch_response( tool_settings[TSPKeys.PLATFORM_POSTAMBLE] = getattr( settings, TSPKeys.PLATFORM_POSTAMBLE.upper(), "" ) - if not check_feature_flag_status(FeatureFlag.REMOTE_FILE_STORAGE): - file_hash = ToolUtils.get_hash_from_file(file_path=doc_path) - else: - file_hash = fs_instance.get_hash_from_file(path=doc_path) + file_hash = fs_instance.get_hash_from_file(path=doc_path) payload = { TSPKeys.TOOL_SETTINGS: tool_settings, @@ -1012,27 +953,16 @@ def dynamic_indexer( usage_kwargs["file_name"] = filename util = PromptIdeBaseTool(log_level=LogLevel.INFO, org_id=org_id) tool_index = Index(tool=util) - if not check_feature_flag_status(FeatureFlag.REMOTE_FILE_STORAGE): - doc_id_key = tool_index.generate_index_key( - vector_db=vector_db, - embedding=embedding_model, - x2text=x2text_adapter, - chunk_size=str(profile_manager.chunk_size), - chunk_overlap=str(profile_manager.chunk_overlap), - file_path=file_path, - file_hash=None, - ) - else: - doc_id_key = tool_index.generate_index_key( - vector_db=vector_db, - embedding=embedding_model, - x2text=x2text_adapter, - chunk_size=str(profile_manager.chunk_size), - chunk_overlap=str(profile_manager.chunk_overlap), - file_path=file_path, - file_hash=None, - fs=fs, - ) + doc_id_key = tool_index.generate_index_key( + vector_db=vector_db, + embedding=embedding_model, + x2text=x2text_adapter, + chunk_size=str(profile_manager.chunk_size), + chunk_overlap=str(profile_manager.chunk_overlap), + file_path=file_path, + file_hash=None, + fs=fs, + ) if not reindex: indexed_doc_id = DocumentIndexingService.get_indexed_document_id( org_id=org_id, user_id=user_id, doc_id_key=doc_id_key @@ -1055,35 +985,20 @@ def dynamic_indexer( DocumentIndexingService.set_document_indexing( org_id=org_id, user_id=user_id, doc_id_key=doc_id_key ) - if not check_feature_flag_status(FeatureFlag.REMOTE_FILE_STORAGE): - doc_id: str = tool_index.index( - tool_id=tool_id, - embedding_instance_id=embedding_model, - vector_db_instance_id=vector_db, - x2text_instance_id=x2text_adapter, - file_path=file_path, - chunk_size=profile_manager.chunk_size, - chunk_overlap=profile_manager.chunk_overlap, - reindex=reindex, - output_file_path=extract_file_path, - usage_kwargs=usage_kwargs.copy(), - process_text=process_text, - ) - else: - doc_id: str = tool_index.index( - tool_id=tool_id, - embedding_instance_id=embedding_model, - vector_db_instance_id=vector_db, - x2text_instance_id=x2text_adapter, - file_path=file_path, - chunk_size=profile_manager.chunk_size, - chunk_overlap=profile_manager.chunk_overlap, - reindex=reindex, - output_file_path=extract_file_path, - usage_kwargs=usage_kwargs.copy(), - process_text=process_text, - fs=fs, - ) + doc_id: str = tool_index.index( + tool_id=tool_id, + embedding_instance_id=embedding_model, + vector_db_instance_id=vector_db, + x2text_instance_id=x2text_adapter, + file_path=file_path, + chunk_size=profile_manager.chunk_size, + chunk_overlap=profile_manager.chunk_overlap, + reindex=reindex, + output_file_path=extract_file_path, + usage_kwargs=usage_kwargs.copy(), + process_text=process_text, + fs=fs, + ) PromptStudioIndexHelper.handle_index_manager( document_id=document_id, @@ -1145,35 +1060,22 @@ def _fetch_single_pass_response( if not default_profile: raise DefaultProfileError() - if not check_feature_flag_status(FeatureFlag.REMOTE_FILE_STORAGE): - index_result = PromptStudioHelper.dynamic_indexer( - profile_manager=default_profile, - file_path=file_path, - tool_id=tool_id, - org_id=org_id, - is_summary=tool.summarize_as_source, - document_id=document_id, - run_id=run_id, - user_id=user_id, - process_text=process_text, - ) - else: - fs_instance = EnvHelper.get_storage( - storage_type=StorageType.PERMANENT, - env_name=FileStorageKeys.PERMANENT_REMOTE_STORAGE, - ) - index_result = PromptStudioHelper.dynamic_indexer( - profile_manager=default_profile, - file_path=file_path, - tool_id=tool_id, - org_id=org_id, - is_summary=tool.summarize_as_source, - document_id=document_id, - run_id=run_id, - user_id=user_id, - process_text=process_text, - fs=fs_instance, - ) + fs_instance = EnvHelper.get_storage( + storage_type=StorageType.PERMANENT, + env_name=FileStorageKeys.PERMANENT_REMOTE_STORAGE, + ) + index_result = PromptStudioHelper.dynamic_indexer( + profile_manager=default_profile, + file_path=file_path, + tool_id=tool_id, + org_id=org_id, + is_summary=tool.summarize_as_source, + document_id=document_id, + run_id=run_id, + user_id=user_id, + process_text=process_text, + fs=fs_instance, + ) if index_result.get("status") == IndexingStatus.PENDING_STATUS.value: return { "status": IndexingStatus.PENDING_STATUS.value, @@ -1214,10 +1116,7 @@ def _fetch_single_pass_response( if tool.summarize_as_source: path = Path(file_path) file_path = str(path.parent / TSPKeys.SUMMARIZE / (path.stem + ".txt")) - if not check_feature_flag_status(FeatureFlag.REMOTE_FILE_STORAGE): - file_hash = ToolUtils.get_hash_from_file(file_path=file_path) - else: - file_hash = fs_instance.get_hash_from_file(path=file_path) + file_hash = fs_instance.get_hash_from_file(path=file_path) payload = { TSPKeys.TOOL_SETTINGS: tool_settings, diff --git a/backend/prompt_studio/prompt_studio_core_v2/views.py b/backend/prompt_studio/prompt_studio_core_v2/views.py index 4eaa67bdb..4f796590d 100644 --- a/backend/prompt_studio/prompt_studio_core_v2/views.py +++ b/backend/prompt_studio/prompt_studio_core_v2/views.py @@ -423,49 +423,15 @@ def fetch_contents_ide(self, request: HttpRequest, pk: Any = None) -> Response: f"{FileViewTypes.SUMMARIZE.lower()}/" f"{filename_without_extension}.txt" ) - - if not check_feature_flag_status(FeatureFlag.REMOTE_FILE_STORAGE): - - file_path = file_path = FileManagerHelper.handle_sub_directory_for_tenants( - UserSessionUtils.get_organization_id(request), - is_create=True, + try: + contents = PromptStudioFileHelper.fetch_file_contents( + file_name=file_name, + org_id=UserSessionUtils.get_organization_id(request), user_id=custom_tool.created_by.user_id, tool_id=str(custom_tool.tool_id), ) - file_system = LocalStorageFS(settings={"path": file_path}) - if not file_path.endswith("/"): - file_path += "/" - file_path += file_name - # Temporary Hack for frictionless onboarding as the user id will be empty - try: - contents = FileManagerHelper.fetch_file_contents( - file_system, file_path, allowed_content_types - ) - except FileNotFound: - file_path = file_path = ( - FileManagerHelper.handle_sub_directory_for_tenants( - UserSessionUtils.get_organization_id(request), - is_create=True, - user_id="", - tool_id=str(custom_tool.tool_id), - ) - ) - if not file_path.endswith("/"): - file_path += "/" - file_path += file_name - contents = FileManagerHelper.fetch_file_contents( - file_system, file_path, allowed_content_types - ) - else: - try: - contents = PromptStudioFileHelper.fetch_file_contents( - file_name=file_name, - org_id=UserSessionUtils.get_organization_id(request), - user_id=custom_tool.created_by.user_id, - tool_id=str(custom_tool.tool_id), - ) - except FileNotFoundError: - raise FileNotFound() + except FileNotFoundError: + raise FileNotFound() return Response({"data": contents}, status=status.HTTP_200_OK) @action(detail=True, methods=["post"]) @@ -553,28 +519,12 @@ def delete_for_ide(self, request: HttpRequest, pk: uuid) -> Response: document.delete() # Delete the files file_name: str = document.document_name - if not check_feature_flag_status(FeatureFlag.REMOTE_FILE_STORAGE): - file_path = FileManagerHelper.handle_sub_directory_for_tenants( - org_id=org_id, - is_create=False, - user_id=user_id, - tool_id=str(custom_tool.tool_id), - ) - path = file_path - file_system = LocalStorageFS(settings={"path": path}) - FileManagerHelper.delete_file(file_system, path, file_name) - # Directories to delete the text files - directories = ["extract/", "extract/metadata/", "summarize/"] - FileManagerHelper.delete_related_files( - file_system, path, file_name, directories - ) - else: - PromptStudioFileHelper.delete_for_ide( - org_id=org_id, - user_id=user_id, - tool_id=str(custom_tool.tool_id), - file_name=file_name, - ) + PromptStudioFileHelper.delete_for_ide( + org_id=org_id, + user_id=user_id, + tool_id=str(custom_tool.tool_id), + file_name=file_name, + ) return Response( {"data": "File deleted succesfully."}, status=status.HTTP_200_OK, diff --git a/backend/workflow_manager/endpoint_v2/base_connector.py b/backend/workflow_manager/endpoint_v2/base_connector.py index 7077be739..d7f55b76e 100644 --- a/backend/workflow_manager/endpoint_v2/base_connector.py +++ b/backend/workflow_manager/endpoint_v2/base_connector.py @@ -7,10 +7,8 @@ from utils.constants import Common from utils.user_context import UserContext -from backend.constants import FeatureFlag from unstract.connectors.filesystems import connectors from unstract.connectors.filesystems.unstract_file_system import UnstractFileSystem -from unstract.flags.feature_flag import check_feature_flag_status class BaseConnector(ExecutionFileHandler): @@ -25,13 +23,6 @@ def __init__( utilities. """ super().__init__(workflow_id, execution_id, organization_id) - if not check_feature_flag_status(FeatureFlag.REMOTE_FILE_STORAGE): - if not (settings.API_STORAGE_DIR and settings.WORKFLOW_DATA_DIR): - raise ValueError("Missed env API_STORAGE_DIR or WORKFLOW_DATA_DIR") - # Directory path for storing execution-related files for API - self.api_storage_dir: str = self.create_execution_dir_path( - workflow_id, execution_id, organization_id, settings.API_STORAGE_DIR - ) def get_fsspec( self, settings: dict[str, Any], connector_id: str @@ -103,12 +94,7 @@ def get_api_storage_dir_path(cls, workflow_id: str, execution_id: str) -> str: str: The directory path for the execution. """ organization_id = UserContext.get_organization_identifier() - if check_feature_flag_status(FeatureFlag.REMOTE_FILE_STORAGE): - api_storage_dir: str = cls.get_api_execution_dir( - workflow_id, execution_id, organization_id - ) - else: - api_storage_dir: str = cls.create_execution_dir_path( - workflow_id, execution_id, organization_id, settings.API_STORAGE_DIR - ) + api_storage_dir: str = cls.get_api_execution_dir( + workflow_id, execution_id, organization_id + ) return api_storage_dir diff --git a/backend/workflow_manager/endpoint_v2/destination.py b/backend/workflow_manager/endpoint_v2/destination.py index fe8e1dce6..05e1cbbc6 100644 --- a/backend/workflow_manager/endpoint_v2/destination.py +++ b/backend/workflow_manager/endpoint_v2/destination.py @@ -5,10 +5,7 @@ import os from typing import Any, Optional, Union -import fsspec -import magic from connector_v2.models import ConnectorInstance -from fsspec.implementations.local import LocalFileSystem from unstract.sdk.constants import ToolExecKey from unstract.sdk.file_storage.constants import FileOperationParams from unstract.sdk.tool.mime_types import EXT_MIME_MAP @@ -39,13 +36,9 @@ from workflow_manager.workflow_v2.models.workflow import Workflow from workflow_manager.workflow_v2.utils import WorkflowUtil -from backend.constants import FeatureFlag from backend.exceptions import UnstractFSException from unstract.connectors.exceptions import ConnectorError -from unstract.flags.feature_flag import check_feature_flag_status - -if check_feature_flag_status(FeatureFlag.REMOTE_FILE_STORAGE): - from unstract.filesystem import FileStorageType, FileSystem +from unstract.filesystem import FileStorageType, FileSystem logger = logging.getLogger(__name__) @@ -239,12 +232,10 @@ def copy_output_to_output_directory(self) -> None: destination_fs.create_dir_if_not_exists(input_dir=output_directory) # Traverse local directory and create the same structure in the # output_directory - if check_feature_flag_status(FeatureFlag.REMOTE_FILE_STORAGE): - file_system = FileSystem(FileStorageType.WORKFLOW_EXECUTION) - fs = file_system.get_file_storage() - dir_path = fs.walk(str(destination_volume_path)) - else: - dir_path = os.walk(destination_volume_path) + file_system = FileSystem(FileStorageType.WORKFLOW_EXECUTION) + fs = file_system.get_file_storage() + dir_path = fs.walk(str(destination_volume_path)) + for root, dirs, files in dir_path: for dir_name in dirs: current_dir = os.path.join( @@ -416,36 +407,7 @@ def get_result(self, file_history: Optional[FileHistory] = None) -> Optional[Any Returns: Union[dict[str, Any], str]: Result data. """ - if check_feature_flag_status(FeatureFlag.REMOTE_FILE_STORAGE): - return self.get_result_with_file_storage(file_history=file_history) - if file_history and file_history.result: - return self.parse_string(file_history.result) - output_file = os.path.join(self.execution_dir, WorkflowFileType.INFILE) - metadata: dict[str, Any] = self.get_workflow_metadata() - output_type = self.get_output_type(metadata) - result: Union[dict[str, Any], str] = "" - try: - # TODO: SDK handles validation; consider removing here. - mime = magic.Magic() - file_type = mime.from_file(output_file) - if output_type == ToolOutputType.JSON: - if "JSON" not in file_type: - logger.error(f"Output type json mismatched {file_type}") - raise ToolOutputTypeMismatch() - with open(output_file) as file: - result = json.load(file) - elif output_type == ToolOutputType.TXT: - if "JSON" in file_type: - logger.error(f"Output type txt mismatched {file_type}") - raise ToolOutputTypeMismatch() - with open(output_file) as file: - result = file.read() - result = result.encode("utf-8").decode("unicode-escape") - else: - raise InvalidToolOutputType() - except (FileNotFoundError, json.JSONDecodeError) as err: - logger.error(f"Error while getting result {err}") - return result + return self.get_result_with_file_storage(file_history=file_history) def get_result_with_file_storage( self, file_history: Optional[FileHistory] = None @@ -506,14 +468,10 @@ def delete_execution_directory(self) -> None: Returns: None """ - if check_feature_flag_status(FeatureFlag.REMOTE_FILE_STORAGE): - file_system = FileSystem(FileStorageType.WORKFLOW_EXECUTION) - file_storage = file_system.get_file_storage() - if file_storage.exists(self.execution_dir): - file_storage.rm(self.execution_dir, recursive=True) - else: - fs: LocalFileSystem = fsspec.filesystem("file") - fs.rm(self.execution_dir, recursive=True) + file_system = FileSystem(FileStorageType.WORKFLOW_EXECUTION) + file_storage = file_system.get_file_storage() + if file_storage.exists(self.execution_dir): + file_storage.rm(self.execution_dir, recursive=True) self.delete_api_storage_dir(self.workflow_id, self.execution_id) @classmethod @@ -526,14 +484,10 @@ def delete_api_storage_dir(cls, workflow_id: str, execution_id: str) -> None: api_storage_dir = cls.get_api_storage_dir_path( workflow_id=workflow_id, execution_id=execution_id ) - if check_feature_flag_status(FeatureFlag.REMOTE_FILE_STORAGE): - file_system = FileSystem(FileStorageType.API_EXECUTION) - file_storage = file_system.get_file_storage() - if file_storage.exists(api_storage_dir): - file_storage.rm(api_storage_dir, recursive=True) - else: - fs: LocalFileSystem = fsspec.filesystem("file") - fs.rm(api_storage_dir, recursive=True) + file_system = FileSystem(FileStorageType.API_EXECUTION) + file_storage = file_system.get_file_storage() + if file_storage.exists(api_storage_dir): + file_storage.rm(api_storage_dir, recursive=True) @classmethod def create_endpoint_for_workflow( diff --git a/backend/workflow_manager/endpoint_v2/source.py b/backend/workflow_manager/endpoint_v2/source.py index 633d4db6e..5af7c1039 100644 --- a/backend/workflow_manager/endpoint_v2/source.py +++ b/backend/workflow_manager/endpoint_v2/source.py @@ -39,11 +39,7 @@ from workflow_manager.workflow_v2.file_history_helper import FileHistoryHelper from workflow_manager.workflow_v2.models.workflow import Workflow -from backend.constants import FeatureFlag -from unstract.flags.feature_flag import check_feature_flag_status - -if check_feature_flag_status(FeatureFlag.REMOTE_FILE_STORAGE): - from unstract.filesystem import FileStorageType, FileSystem +from unstract.filesystem import FileStorageType, FileSystem logger = logging.getLogger(__name__) @@ -508,17 +504,13 @@ def add_input_from_connector_to_volume(self, input_file_path: str) -> str: ) self.publish_input_file_content(input_file_path, input_log) - if check_feature_flag_status(FeatureFlag.REMOTE_FILE_STORAGE): - file_system = FileSystem(FileStorageType.WORKFLOW_EXECUTION) - file_storage = file_system.get_file_storage() - file_storage.write(path=source_file_path, mode="wb", data=file_content) - file_storage.write(path=infile_path, mode="wb", data=file_content) - else: - with fsspec.open(source_file, "wb") as local_file: - local_file.write(file_content) + file_system = FileSystem(FileStorageType.WORKFLOW_EXECUTION) + file_storage = file_system.get_file_storage() + file_storage.write(path=source_file_path, mode="wb", data=file_content) + file_storage.write(path=infile_path, mode="wb", data=file_content) - # Copy file to infile directory - self.copy_file_to_infile_dir(source_file_path, infile_path) + # Copy file to infile directory + self.copy_file_to_infile_dir(source_file_path, infile_path) logger.info(f"{input_file_path} is added to execution directory") return hash_value_of_file_content @@ -527,20 +519,17 @@ def add_input_from_api_storage_to_volume(self, input_file_path: str) -> None: """Add input file to execution directory from api storage.""" infile_path = os.path.join(self.execution_dir, WorkflowFileType.INFILE) source_path = os.path.join(self.execution_dir, WorkflowFileType.SOURCE) - if check_feature_flag_status(FeatureFlag.REMOTE_FILE_STORAGE): - api_file_system = FileSystem(FileStorageType.API_EXECUTION) - api_file_storage = api_file_system.get_file_storage() - workflow_file_system = FileSystem(FileStorageType.WORKFLOW_EXECUTION) - workflow_file_storage = workflow_file_system.get_file_storage() - self._copy_file_to_destination( - source_storage=api_file_storage, - destination_storage=workflow_file_storage, - source_path=input_file_path, - destination_paths=[infile_path, source_path], - ) - else: - shutil.copyfile(input_file_path, infile_path) - shutil.copyfile(input_file_path, source_path) + + api_file_system = FileSystem(FileStorageType.API_EXECUTION) + api_file_storage = api_file_system.get_file_storage() + workflow_file_system = FileSystem(FileStorageType.WORKFLOW_EXECUTION) + workflow_file_storage = workflow_file_system.get_file_storage() + self._copy_file_to_destination( + source_storage=api_file_storage, + destination_storage=workflow_file_storage, + source_path=input_file_path, + destination_paths=[infile_path, source_path], + ) # TODO: replace it with method from SDK Utils def _copy_file_to_destination( @@ -701,20 +690,13 @@ def add_input_file_to_api_storage( for file in file_objs: file_name = file.name destination_path = os.path.join(api_storage_dir, file_name) - if check_feature_flag_status(FeatureFlag.REMOTE_FILE_STORAGE): - file_system = FileSystem(FileStorageType.API_EXECUTION) - file_storage = file_system.get_file_storage() - buffer = bytearray() - for chunk in file.chunks(): - buffer.extend(chunk) - file_storage.write(path=destination_path, mode="wb", data=buffer) - else: - os.makedirs(os.path.dirname(destination_path), exist_ok=True) - with open(destination_path, "wb") as f: - buffer = bytearray() - for chunk in file.chunks(): - buffer.extend(chunk) - f.write(buffer) + + file_system = FileSystem(FileStorageType.API_EXECUTION) + file_storage = file_system.get_file_storage() + buffer = bytearray() + for chunk in file.chunks(): + buffer.extend(chunk) + file_storage.write(path=destination_path, mode="wb", data=buffer) file_hash = cls.hash_str(buffer) connection_type = WorkflowEndpoint.ConnectionType.API diff --git a/platform-service/src/unstract/platform_service/helper/cost_calculation.py b/platform-service/src/unstract/platform_service/helper/cost_calculation.py index 53b5cf066..fe5f47562 100644 --- a/platform-service/src/unstract/platform_service/helper/cost_calculation.py +++ b/platform-service/src/unstract/platform_service/helper/cost_calculation.py @@ -1,21 +1,14 @@ import json import os -from datetime import datetime, timedelta +from datetime import datetime, timedelta, timezone from typing import Any, Optional import requests from flask import current_app as app -from unstract.platform_service.constants import FeatureFlag from unstract.platform_service.env import Env from unstract.platform_service.utils import format_float_positional - -from unstract.flags.feature_flag import check_feature_flag_status - -if check_feature_flag_status(FeatureFlag.REMOTE_FILE_STORAGE): - from datetime import timezone - - from unstract.sdk.exceptions import FileStorageError - from unstract.sdk.file_storage import EnvHelper, StorageType +from unstract.sdk.exceptions import FileStorageError +from unstract.sdk.file_storage import EnvHelper, StorageType class CostCalculationHelper: @@ -29,25 +22,22 @@ def __init__( self.url = url self.file_path = file_path - if check_feature_flag_status(FeatureFlag.REMOTE_FILE_STORAGE): - try: - self.file_storage = EnvHelper.get_storage( - StorageType.PERMANENT, "FILE_STORAGE_CREDENTIALS" - ) - self.file_path = os.environ.get("REMOTE_MODEL_PRICES_FILE_PATH") - except KeyError as e: - app.logger.error( - f"Required credentials is missing in the env: {str(e)}" - ) - raise e - except FileStorageError as e: - app.logger.error( - "Error while initialising storage: %s", - e, - stack_info=True, - exc_info=True, - ) - raise e + try: + self.file_storage = EnvHelper.get_storage( + StorageType.PERMANENT, "FILE_STORAGE_CREDENTIALS" + ) + self.file_path = os.environ.get("REMOTE_MODEL_PRICES_FILE_PATH") + except KeyError as e: + app.logger.error(f"Required credentials is missing in the env: {str(e)}") + raise e + except FileStorageError as e: + app.logger.error( + "Error while initialising storage: %s", + e, + stack_info=True, + exc_info=True, + ) + raise e self.model_token_data = self._get_model_token_data() @@ -58,10 +48,7 @@ def calculate_cost( item = None if not self.model_token_data: - if check_feature_flag_status(FeatureFlag.REMOTE_FILE_STORAGE): - return json.loads(format_float_positional(cost)) - else: - return format_float_positional(cost) + return json.loads(format_float_positional(cost)) # Filter the model objects by model name filtered_models = { k: v for k, v in self.model_token_data.items() if k.endswith(model_name) @@ -80,43 +67,25 @@ def calculate_cost( def _get_model_token_data(self) -> Optional[dict[str, Any]]: try: - if check_feature_flag_status(FeatureFlag.REMOTE_FILE_STORAGE): - # File does not exist, fetch JSON data from API - if not self.file_storage.exists(self.file_path): - return self._fetch_and_save_json() - - file_mtime = self.file_storage.modification_time(self.file_path) - file_expiry_date = file_mtime + timedelta(days=self.ttl_days) - file_expiry_date_utc = file_expiry_date.replace(tzinfo=timezone.utc) - now_utc = datetime.now().replace(tzinfo=timezone.utc) - - if now_utc < file_expiry_date_utc: - app.logger.info(f"Reading model token data from {self.file_path}") - # File exists and TTL has not expired, read and return content - file_contents = self.file_storage.read( - self.file_path, mode="r", encoding="utf-8" - ) - return json.loads(file_contents) - else: - # TTL expired, fetch updated JSON data from API - return self._fetch_and_save_json() - else: - # File does not exist, fetch JSON data from API - if not os.path.exists(self.file_path): - return self._fetch_and_save_json() - - file_mtime = os.path.getmtime(self.file_path) - file_expiry_date = datetime.fromtimestamp(file_mtime) + timedelta( - days=self.ttl_days + # File does not exist, fetch JSON data from API + if not self.file_storage.exists(self.file_path): + return self._fetch_and_save_json() + + file_mtime = self.file_storage.modification_time(self.file_path) + file_expiry_date = file_mtime + timedelta(days=self.ttl_days) + file_expiry_date_utc = file_expiry_date.replace(tzinfo=timezone.utc) + now_utc = datetime.now().replace(tzinfo=timezone.utc) + + if now_utc < file_expiry_date_utc: + app.logger.info(f"Reading model token data from {self.file_path}") + # File exists and TTL has not expired, read and return content + file_contents = self.file_storage.read( + self.file_path, mode="r", encoding="utf-8" ) - if datetime.now() < file_expiry_date: - app.logger.info(f"Reading model token data from {self.file_path}") - # File exists and TTL has not expired, read and return content - with open(self.file_path, encoding="utf-8") as f: - return json.load(f) - else: - # TTL expired, fetch updated JSON data from API - return self._fetch_and_save_json() + return json.loads(file_contents) + else: + # TTL expired, fetch updated JSON data from API + return self._fetch_and_save_json() except Exception as e: app.logger.warning( "Error in calculate_cost: %s", e, stack_info=True, exc_info=True @@ -137,21 +106,12 @@ def _fetch_and_save_json(self) -> Optional[dict[str, Any]]: response.raise_for_status() json_data = response.json() # Save JSON data to file - if check_feature_flag_status(FeatureFlag.REMOTE_FILE_STORAGE): - self.file_storage.json_dump( - path=self.file_path, - data=json_data, - ensure_ascii=False, - indent=4, - ) - else: - with open(self.file_path, "w", encoding="utf-8") as f: - json.dump(json_data, f, ensure_ascii=False, indent=4) - # Set the file's modification time to indicate TTL - expiry_date = datetime.now() + timedelta(days=self.ttl_days) - expiry_timestamp = expiry_date.timestamp() - os.utime(self.file_path, (expiry_timestamp, expiry_timestamp)) - + self.file_storage.json_dump( + path=self.file_path, + data=json_data, + ensure_ascii=False, + indent=4, + ) app.logger.info( "File '%s' updated successfully with TTL set to %d days.", self.file_path, diff --git a/prompt-service/src/unstract/prompt_service/helper.py b/prompt-service/src/unstract/prompt_service/helper.py index 3797fcb52..7749baa09 100644 --- a/prompt-service/src/unstract/prompt_service/helper.py +++ b/prompt-service/src/unstract/prompt_service/helper.py @@ -10,7 +10,6 @@ from unstract.prompt_service.constants import ( DBTableV2, ExecutionSource, - FeatureFlag, FileStorageKeys, ) from unstract.prompt_service.constants import PromptServiceContants as PSKeys @@ -19,15 +18,11 @@ from unstract.prompt_service.exceptions import APIError, RateLimitError from unstract.sdk.exceptions import RateLimitError as SdkRateLimitError from unstract.sdk.exceptions import SdkError +from unstract.sdk.file_storage import FileStorage, FileStorageProvider +from unstract.sdk.file_storage.constants import StorageType +from unstract.sdk.file_storage.env_helper import EnvHelper from unstract.sdk.llm import LLM -from unstract.flags.feature_flag import check_feature_flag_status - -if check_feature_flag_status(FeatureFlag.REMOTE_FILE_STORAGE): - from unstract.sdk.file_storage import FileStorage, FileStorageProvider - from unstract.sdk.file_storage.constants import StorageType - from unstract.sdk.file_storage.env_helper import EnvHelper - PAID_FEATURE_MSG = ( "It is a cloud / enterprise feature. If you have purchased a plan and still " "face this issue, please contact support" @@ -305,27 +300,23 @@ def run_completion( ) highlight_data = None if highlight_data_plugin and enable_highlight: - if check_feature_flag_status(FeatureFlag.REMOTE_FILE_STORAGE): - fs_instance: FileStorage = FileStorage(FileStorageProvider.LOCAL) - if execution_source == ExecutionSource.IDE.value: - fs_instance = EnvHelper.get_storage( - storage_type=StorageType.PERMANENT, - env_name=FileStorageKeys.PERMANENT_REMOTE_STORAGE, - ) - if execution_source == ExecutionSource.TOOL.value: - fs_instance = EnvHelper.get_storage( - storage_type=StorageType.TEMPORARY, - env_name=FileStorageKeys.TEMPORARY_REMOTE_STORAGE, - ) - highlight_data = highlight_data_plugin["entrypoint_cls"]( - logger=current_app.logger, - file_path=file_path, - fs_instance=fs_instance, - ).run - else: - highlight_data = highlight_data_plugin["entrypoint_cls"]( - logger=current_app.logger, file_path=file_path - ).run + fs_instance: FileStorage = FileStorage(FileStorageProvider.LOCAL) + if execution_source == ExecutionSource.IDE.value: + fs_instance = EnvHelper.get_storage( + storage_type=StorageType.PERMANENT, + env_name=FileStorageKeys.PERMANENT_REMOTE_STORAGE, + ) + if execution_source == ExecutionSource.TOOL.value: + fs_instance = EnvHelper.get_storage( + storage_type=StorageType.TEMPORARY, + env_name=FileStorageKeys.TEMPORARY_REMOTE_STORAGE, + ) + highlight_data = highlight_data_plugin["entrypoint_cls"]( + logger=current_app.logger, + file_path=file_path, + fs_instance=fs_instance, + ).run + completion = llm.complete( prompt=prompt, process_text=highlight_data, @@ -366,32 +357,24 @@ def extract_table( "Unable to extract table details. " "Please contact admin to resolve this issue." ) - if check_feature_flag_status(FeatureFlag.REMOTE_FILE_STORAGE): - fs_instance: FileStorage = FileStorage(FileStorageProvider.LOCAL) - if execution_source == ExecutionSource.IDE.value: - fs_instance = EnvHelper.get_storage( - storage_type=StorageType.PERMANENT, - env_name=FileStorageKeys.PERMANENT_REMOTE_STORAGE, - ) - if execution_source == ExecutionSource.TOOL.value: - fs_instance = EnvHelper.get_storage( - storage_type=StorageType.TEMPORARY, - env_name=FileStorageKeys.TEMPORARY_REMOTE_STORAGE, - ) + fs_instance: FileStorage = FileStorage(FileStorageProvider.LOCAL) + if execution_source == ExecutionSource.IDE.value: + fs_instance = EnvHelper.get_storage( + storage_type=StorageType.PERMANENT, + env_name=FileStorageKeys.PERMANENT_REMOTE_STORAGE, + ) + if execution_source == ExecutionSource.TOOL.value: + fs_instance = EnvHelper.get_storage( + storage_type=StorageType.TEMPORARY, + env_name=FileStorageKeys.TEMPORARY_REMOTE_STORAGE, + ) try: - if check_feature_flag_status(FeatureFlag.REMOTE_FILE_STORAGE): - answer = table_extractor["entrypoint_cls"].extract_large_table( - llm=llm, - table_settings=table_settings, - enforce_type=enforce_type, - fs_instance=fs_instance, - ) - else: - answer = table_extractor["entrypoint_cls"].extract_large_table( - llm=llm, - table_settings=table_settings, - enforce_type=enforce_type, - ) + answer = table_extractor["entrypoint_cls"].extract_large_table( + llm=llm, + table_settings=table_settings, + enforce_type=enforce_type, + fs_instance=fs_instance, + ) structured_output[output[PSKeys.NAME]] = answer # We do not support summary and eval for table. # Hence returning the result @@ -426,32 +409,23 @@ def extract_line_item( ) # Read file content into context - if check_feature_flag_status(FeatureFlag.REMOTE_FILE_STORAGE): - fs_instance: FileStorage = FileStorage(FileStorageProvider.LOCAL) - if execution_source == ExecutionSource.IDE.value: - fs_instance = EnvHelper.get_storage( - storage_type=StorageType.PERMANENT, - env_name=FileStorageKeys.PERMANENT_REMOTE_STORAGE, - ) - if execution_source == ExecutionSource.TOOL.value: - fs_instance = EnvHelper.get_storage( - storage_type=StorageType.TEMPORARY, - env_name=FileStorageKeys.TEMPORARY_REMOTE_STORAGE, - ) - - if not fs_instance.exists(extract_file_path): - raise FileNotFoundError( - f"The file at path '{extract_file_path}' does not exist." - ) - context = fs_instance.read(path=extract_file_path, encoding="utf-8", mode="rb") - else: - if not os.path.exists(extract_file_path): - raise FileNotFoundError( - f"The file at path '{extract_file_path}' does not exist." - ) + fs_instance: FileStorage = FileStorage(FileStorageProvider.LOCAL) + if execution_source == ExecutionSource.IDE.value: + fs_instance = EnvHelper.get_storage( + storage_type=StorageType.PERMANENT, + env_name=FileStorageKeys.PERMANENT_REMOTE_STORAGE, + ) + if execution_source == ExecutionSource.TOOL.value: + fs_instance = EnvHelper.get_storage( + storage_type=StorageType.TEMPORARY, + env_name=FileStorageKeys.TEMPORARY_REMOTE_STORAGE, + ) - with open(extract_file_path, encoding="utf-8") as file: - context = file.read() + if not fs_instance.exists(extract_file_path): + raise FileNotFoundError( + f"The file at path '{extract_file_path}' does not exist." + ) + context = fs_instance.read(path=extract_file_path, encoding="utf-8", mode="rb") prompt = construct_prompt( preamble=tool_settings.get(PSKeys.PREAMBLE, ""), diff --git a/runner/src/unstract/runner/clients/docker.py b/runner/src/unstract/runner/clients/docker.py index 5676423b5..58f99a463 100644 --- a/runner/src/unstract/runner/clients/docker.py +++ b/runner/src/unstract/runner/clients/docker.py @@ -9,12 +9,11 @@ ContainerClientInterface, ContainerInterface, ) -from unstract.runner.constants import Env, FeatureFlag +from unstract.runner.constants import Env from unstract.runner.utils import Utils from docker import DockerClient from unstract.core.utilities import UnstractUtils -from unstract.flags.feature_flag import check_feature_flag_status class DockerContainer(ContainerInterface): @@ -170,20 +169,6 @@ def get_container_run_config( envs = {} mounts = [] if organization_id and workflow_id and execution_id: - if not check_feature_flag_status(FeatureFlag.REMOTE_FILE_STORAGE): - source_path = os.path.join( - os.getenv(Env.WORKFLOW_DATA_DIR, ""), - organization_id, - workflow_id, - execution_id, - ) - mounts.append( - { - "type": "bind", - "source": source_path, - "target": os.getenv(Env.TOOL_DATA_DIR, "/data"), - } - ) envs[Env.EXECUTION_RUN_DATA_FOLDER] = os.path.join( os.getenv(Env.EXECUTION_RUN_DATA_FOLDER_PREFIX, ""), organization_id, diff --git a/runner/src/unstract/runner/runner.py b/runner/src/unstract/runner/runner.py index eda4c6528..ce1b1dc2e 100644 --- a/runner/src/unstract/runner/runner.py +++ b/runner/src/unstract/runner/runner.py @@ -11,12 +11,11 @@ ContainerClientInterface, ContainerInterface, ) -from unstract.runner.constants import Env, FeatureFlag, LogLevel, LogType, ToolKey +from unstract.runner.constants import Env, LogLevel, LogType, ToolKey from unstract.runner.exception import ToolRunException from unstract.core.constants import LogFieldName from unstract.core.pubsub_helper import LogPublisher -from unstract.flags.feature_flag import check_feature_flag_status load_dotenv() # Loads the container clinet class. @@ -186,19 +185,16 @@ def run_container( Returns: Optional[Any]: _description_ """ - tool_data_dir = os.getenv(Env.TOOL_DATA_DIR, "/data") - if check_feature_flag_status(FeatureFlag.REMOTE_FILE_STORAGE): - envs[Env.EXECUTION_DATA_DIR] = os.path.join( - os.getenv(Env.WORKFLOW_EXECUTION_DIR_PREFIX, ""), - organization_id, - workflow_id, - execution_id, - ) - envs[Env.WORKFLOW_EXECUTION_FILE_STORAGE_CREDENTIALS] = os.getenv( - Env.WORKFLOW_EXECUTION_FILE_STORAGE_CREDENTIALS, "{}" - ) - else: - envs[Env.TOOL_DATA_DIR] = tool_data_dir + + envs[Env.EXECUTION_DATA_DIR] = os.path.join( + os.getenv(Env.WORKFLOW_EXECUTION_DIR_PREFIX, ""), + organization_id, + workflow_id, + execution_id, + ) + envs[Env.WORKFLOW_EXECUTION_FILE_STORAGE_CREDENTIALS] = os.getenv( + Env.WORKFLOW_EXECUTION_FILE_STORAGE_CREDENTIALS, "{}" + ) container_config = self.client.get_container_run_config( command=[ diff --git a/tools/classifier/src/helper.py b/tools/classifier/src/helper.py index 66179d685..5f0dd699c 100644 --- a/tools/classifier/src/helper.py +++ b/tools/classifier/src/helper.py @@ -1,5 +1,4 @@ import re -import shutil from pathlib import Path from typing import Any, Optional @@ -67,20 +66,13 @@ def copy_source_to_output_bin( """ try: output_folder_bin = Path(self.output_dir) / classification - if self.tool.workflow_filestorage: - output_file = output_folder_bin / source_name - self._copy_file( - source_fs=self.tool.workflow_filestorage, - destination_fs=self.tool.workflow_filestorage, - source_path=source_file, - destination_path=str(output_file), - ) - else: - if not output_folder_bin.is_dir(): - output_folder_bin.mkdir(parents=True, exist_ok=True) - - output_file = output_folder_bin / source_name - shutil.copyfile(source_file, output_file) + output_file = output_folder_bin / source_name + self._copy_file( + source_fs=self.tool.workflow_filestorage, + destination_fs=self.tool.workflow_filestorage, + source_path=source_file, + destination_path=str(output_file), + ) except Exception as e: self.tool.stream_error_and_exit(f"Error creating output file: {e}") @@ -150,16 +142,11 @@ def _extract_from_adapter(self, file: str, adapter_id: str) -> Optional[str]: self.tool.stream_log("Text extraction adapter has been created successfully.") try: - if self.tool.workflow_filestorage: - extraction_result: TextExtractionResult = x2text.process( - input_file_path=file, - fs=self.tool.workflow_filestorage, - tags=self.tool.tags, - ) - else: - extraction_result: TextExtractionResult = x2text.process( - input_file_path=file, tags=self.tool.tags - ) + extraction_result: TextExtractionResult = x2text.process( + input_file_path=file, + fs=self.tool.workflow_filestorage, + tags=self.tool.tags, + ) extracted_text: str = extraction_result.extracted_text return extracted_text except Exception as e: @@ -176,13 +163,9 @@ def _extract_from_file(self, file: str) -> Optional[str]: """ self.tool.stream_log("Extracting text from file") try: - if self.tool.workflow_filestorage: - text = self.tool.workflow_filestorage.read(path=file, mode="rb").decode( - "utf-8" - ) - else: - with open(file, "rb") as f: - text = f.read().decode("utf-8") + text = self.tool.workflow_filestorage.read(path=file, mode="rb").decode( + "utf-8" + ) except Exception as e: self.tool.stream_log(f"File error: {e}") return None diff --git a/tools/structure/src/main.py b/tools/structure/src/main.py index 6a6149ef6..daf7014f1 100644 --- a/tools/structure/src/main.py +++ b/tools/structure/src/main.py @@ -11,7 +11,6 @@ from unstract.sdk.prompt import PromptTool from unstract.sdk.tool.base import BaseTool from unstract.sdk.tool.entrypoint import ToolEntrypoint -from unstract.sdk.utils import ToolUtils from utils import json_to_markdown logger = logging.getLogger(__name__) @@ -94,10 +93,7 @@ def run( _, file_name = os.path.split(input_file) if summarize_as_source: file_name = SettingsKeys.SUMMARIZE - if self.workflow_filestorage: - tool_data_dir = Path(self.get_env_or_die(ToolEnv.EXECUTION_DATA_DIR)) - else: - tool_data_dir = Path(self.get_env_or_die(SettingsKeys.TOOL_DATA_DIR)) + tool_data_dir = Path(self.get_env_or_die(ToolEnv.EXECUTION_DATA_DIR)) execution_run_data_folder = Path( self.get_env_or_die(SettingsKeys.EXECUTION_RUN_DATA_FOLDER) ) @@ -150,11 +146,7 @@ def run( usage_kwargs=usage_kwargs, process_text=process_text, tags=self.tags, - **( - {"fs": self.workflow_filestorage} - if self.workflow_filestorage is not None - else {} - ), + **({"fs": self.workflow_filestorage}), ) index_metrics = {SettingsKeys.INDEXING: index.get_metrics()} if summarize_as_source: @@ -193,11 +185,7 @@ def run( usage_kwargs=usage_kwargs, process_text=process_text, tags=self.tags, - **( - {"fs": self.workflow_filestorage} - if self.workflow_filestorage is not None - else {} - ), + **({"fs": self.workflow_filestorage}), ) index_metrics[output[SettingsKeys.NAME]] = { SettingsKeys.INDEXING: index.get_metrics() @@ -286,13 +274,9 @@ def run( try: self.stream_log("Writing parsed output...") output_path = Path(output_dir) / f"{Path(self.source_file_name).stem}.json" - if self.workflow_filestorage: - self.workflow_filestorage.json_dump( - path=output_path, data=structured_output_dict - ) - else: - with open(output_path, "w", encoding="utf-8") as f: - f.write(structured_output) + self.workflow_filestorage.json_dump( + path=output_path, data=structured_output_dict + ) except OSError as e: self.stream_error_and_exit(f"Error creating output file: {e}") except json.JSONDecodeError as e: @@ -333,23 +317,13 @@ def _summarize_and_index( summarize_file_path = tool_data_dir / SettingsKeys.SUMMARIZE summarized_context = "" - if self.workflow_filestorage: - if self.workflow_filestorage.exists(summarize_file_path): - summarized_context = self.workflow_filestorage.read( - path=summarize_file_path, mode="r" - ) - elif summarize_file_path.exists(): - with open(summarize_file_path, encoding="utf-8") as f: - summarized_context = f.read() + if self.workflow_filestorage.exists(summarize_file_path): + summarized_context = self.workflow_filestorage.read( + path=summarize_file_path, mode="r" + ) if not summarized_context: context = "" - if self.workflow_filestorage: - context = self.workflow_filestorage.read( - path=extract_file_path, mode="r" - ) - else: - with open(extract_file_path, encoding="utf-8") as file: - context = file.read() + context = self.workflow_filestorage.read(path=extract_file_path, mode="r") prompt_keys = [] for output in outputs: prompt_keys.append(output[SettingsKeys.NAME]) @@ -374,23 +348,14 @@ def _summarize_and_index( structure_output = json.loads(response[SettingsKeys.STRUCTURE_OUTPUT]) summarized_context = structure_output.get(SettingsKeys.DATA, "") self.stream_log("Writing summarized context to a file") - if self.workflow_filestorage: - self.workflow_filestorage.write( - path=summarize_file_path, mode="w", data=summarized_context - ) - else: - with open(summarize_file_path, "w", encoding="utf-8") as f: - f.write(summarized_context) + self.workflow_filestorage.write( + path=summarize_file_path, mode="w", data=summarized_context + ) self.stream_log("Indexing summarized context") - if self.workflow_filestorage: - summarize_file_hash: str = self.workflow_filestorage.get_hash_from_file( - path=summarize_file_path - ) - else: - summarize_file_hash: str = ToolUtils.get_hash_from_file( - file_path=summarize_file_path - ) + summarize_file_hash: str = self.workflow_filestorage.get_hash_from_file( + path=summarize_file_path + ) index.index( tool_id=tool_id, embedding_instance_id=embedding_instance_id, @@ -402,11 +367,7 @@ def _summarize_and_index( chunk_overlap=0, usage_kwargs=usage_kwargs, tags=self.tags, - **( - {"fs": self.workflow_filestorage} - if self.workflow_filestorage is not None - else {} - ), + **({"fs": self.workflow_filestorage}), ) return summarize_file_hash diff --git a/tools/text_extractor/src/main.py b/tools/text_extractor/src/main.py index 65f3e916e..4315ab4cd 100644 --- a/tools/text_extractor/src/main.py +++ b/tools/text_extractor/src/main.py @@ -63,14 +63,9 @@ def run( usage_kwargs=usage_kwargs, ) self.stream_log("Text extraction adapter has been created successfully.") - if self.workflow_filestorage: - extraction_result: TextExtractionResult = text_extraction_adapter.process( - input_file_path=input_file, fs=self.workflow_filestorage, tags=self.tags - ) - else: - extraction_result: TextExtractionResult = text_extraction_adapter.process( - input_file_path=input_file, tags=self.tags - ) + extraction_result: TextExtractionResult = text_extraction_adapter.process( + input_file_path=input_file, fs=self.workflow_filestorage, tags=self.tags + ) extracted_text = self.convert_to_actual_string(extraction_result.extracted_text) self.stream_log("Text has been extracted successfully.") @@ -85,13 +80,9 @@ def run( output_path = ( Path(output_dir) / f"{Path(self.source_file_name).stem}.txt" ) - if self.workflow_filestorage: - self.workflow_filestorage.write( - path=output_path, mode="w", data=extracted_text - ) - else: - with open(output_path, "w", encoding="utf-8") as file: - file.write(extracted_text) + self.workflow_filestorage.write( + path=output_path, mode="w", data=extracted_text + ) self.stream_log("Tool output written successfully.") else: diff --git a/unstract/connectors/src/unstract/connectors/filesystems/azure_cloud_storage/azure_cloud_storage.py b/unstract/connectors/src/unstract/connectors/filesystems/azure_cloud_storage/azure_cloud_storage.py index c9898343e..e4cc47ab1 100644 --- a/unstract/connectors/src/unstract/connectors/filesystems/azure_cloud_storage/azure_cloud_storage.py +++ b/unstract/connectors/src/unstract/connectors/filesystems/azure_cloud_storage/azure_cloud_storage.py @@ -5,13 +5,9 @@ import azure.core.exceptions as AzureException from adlfs import AzureBlobFileSystem -from backend.constants import FeatureFlag from unstract.connectors.exceptions import AzureHttpError, ConnectorError from unstract.connectors.filesystems.unstract_file_system import UnstractFileSystem -from unstract.flags.feature_flag import check_feature_flag_status - -if check_feature_flag_status(FeatureFlag.REMOTE_FILE_STORAGE): - from unstract.filesystem import FileStorageType, FileSystem +from unstract.filesystem import FileStorageType, FileSystem logging.getLogger("azurefs").setLevel(logging.ERROR) logger = logging.getLogger(__name__) @@ -97,13 +93,9 @@ def upload_file_to_storage(self, source_path: str, destination_path: str) -> Non normalized_path = os.path.normpath(destination_path) destination_connector_fs = self.get_fsspec_fs() try: - if check_feature_flag_status(FeatureFlag.REMOTE_FILE_STORAGE): - file_system = FileSystem(FileStorageType.WORKFLOW_EXECUTION) - workflow_fs = file_system.get_file_storage() - data = workflow_fs.read(path=source_path, mode="rb") - else: - with open(source_path, "rb") as source_file: - data = source_file.read() + file_system = FileSystem(FileStorageType.WORKFLOW_EXECUTION) + workflow_fs = file_system.get_file_storage() + data = workflow_fs.read(path=source_path, mode="rb") destination_connector_fs.write_bytes(normalized_path, data) except AzureException.HttpResponseError as e: self.raise_http_exception(e=e, path=normalized_path) diff --git a/unstract/connectors/src/unstract/connectors/filesystems/unstract_file_system.py b/unstract/connectors/src/unstract/connectors/filesystems/unstract_file_system.py index 51542357d..b2162420c 100644 --- a/unstract/connectors/src/unstract/connectors/filesystems/unstract_file_system.py +++ b/unstract/connectors/src/unstract/connectors/filesystems/unstract_file_system.py @@ -5,13 +5,9 @@ from fsspec import AbstractFileSystem -from backend.constants import FeatureFlag from unstract.connectors.base import UnstractConnector from unstract.connectors.enums import ConnectorMode -from unstract.flags.feature_flag import check_feature_flag_status - -if check_feature_flag_status(FeatureFlag.REMOTE_FILE_STORAGE): - from unstract.filesystem import FileStorageType, FileSystem +from unstract.filesystem import FileStorageType, FileSystem logger = logging.getLogger(__name__) @@ -106,11 +102,7 @@ def upload_file_to_storage(self, source_path: str, destination_path: str) -> Non """ normalized_path = os.path.normpath(destination_path) destination_connector_fs = self.get_fsspec_fs() - if check_feature_flag_status(FeatureFlag.REMOTE_FILE_STORAGE): - file_system = FileSystem(FileStorageType.WORKFLOW_EXECUTION) - workflow_fs = file_system.get_file_storage() - data = workflow_fs.read(path=source_path, mode="rb") - else: - with open(source_path, "rb") as source_file: - data = source_file.read() + file_system = FileSystem(FileStorageType.WORKFLOW_EXECUTION) + workflow_fs = file_system.get_file_storage() + data = workflow_fs.read(path=source_path, mode="rb") destination_connector_fs.write_bytes(normalized_path, data) diff --git a/unstract/tool-registry/src/unstract/tool_registry/tool_registry.py b/unstract/tool-registry/src/unstract/tool_registry/tool_registry.py index 4a060a3a0..74bbc5118 100644 --- a/unstract/tool-registry/src/unstract/tool_registry/tool_registry.py +++ b/unstract/tool-registry/src/unstract/tool_registry/tool_registry.py @@ -3,24 +3,15 @@ import os from typing import Any, Optional -from unstract.tool_registry.constants import ( - FeatureFlag, - PropKey, - ToolJsonField, - ToolKey, -) +from unstract.sdk.exceptions import FileStorageError +from unstract.sdk.file_storage import FileStorageProvider, PermanentFileStorage +from unstract.tool_registry.constants import PropKey, ToolJsonField, ToolKey from unstract.tool_registry.dto import Tool from unstract.tool_registry.exceptions import InvalidToolURLException from unstract.tool_registry.helper import ToolRegistryHelper from unstract.tool_registry.schema_validator import JsonSchemaValidator from unstract.tool_registry.tool_utils import ToolUtils -from unstract.flags.feature_flag import check_feature_flag_status - -if check_feature_flag_status(FeatureFlag.REMOTE_FILE_STORAGE): - from unstract.sdk.exceptions import FileStorageError - from unstract.sdk.file_storage import FileStorageProvider, PermanentFileStorage - logger = logging.getLogger(__name__) @@ -58,10 +49,8 @@ def __init__( "registry JSONs and YAML to a directory and set the env." ) - if check_feature_flag_status(FeatureFlag.REMOTE_FILE_STORAGE): - self.fs = self._get_storage_credentials() - else: - self.fs = None + self.fs = self._get_storage_credentials() + self.helper = ToolRegistryHelper( registry=os.path.join(directory, registry_file), private_tools_file=os.path.join(directory, private_tools), @@ -69,30 +58,28 @@ def __init__( fs=self.fs, ) - if check_feature_flag_status(FeatureFlag.REMOTE_FILE_STORAGE): - - def _get_storage_credentials(self) -> PermanentFileStorage: - try: - # Not creating constants for now for the keywords below as this - # logic ought to change in the near future to maintain unformity - # across services - file_storage = json.loads( - os.environ.get("TOOL_REGISTRY_STORAGE_CREDENTIALS", {}) - ) - provider = FileStorageProvider(file_storage["provider"]) - credentials = file_storage.get("credentials", {}) - return PermanentFileStorage(provider, **credentials) - except KeyError as e: - logger.error(f"Required credentials is missing in the env: {str(e)}") - raise e - except FileStorageError as e: - logger.error( - "Error while initialising storage: %s", - e, - stack_info=True, - exc_info=True, - ) - raise e + def _get_storage_credentials(self) -> PermanentFileStorage: + try: + # Not creating constants for now for the keywords below as this + # logic ought to change in the near future to maintain unformity + # across services + file_storage = json.loads( + os.environ.get("TOOL_REGISTRY_STORAGE_CREDENTIALS", {}) + ) + provider = FileStorageProvider(file_storage["provider"]) + credentials = file_storage.get("credentials", {}) + return PermanentFileStorage(provider, **credentials) + except KeyError as e: + logger.error(f"Required credentials is missing in the env: {str(e)}") + raise e + except FileStorageError as e: + logger.error( + "Error while initialising storage: %s", + e, + stack_info=True, + exc_info=True, + ) + raise e def load_all_tools_to_disk(self) -> None: self.helper.load_all_tools_to_disk() diff --git a/unstract/tool-registry/src/unstract/tool_registry/tool_utils.py b/unstract/tool-registry/src/unstract/tool_registry/tool_utils.py index f2b7f7cf7..916fdb76f 100644 --- a/unstract/tool-registry/src/unstract/tool_registry/tool_utils.py +++ b/unstract/tool-registry/src/unstract/tool_registry/tool_utils.py @@ -3,15 +3,12 @@ import re from typing import Any, Optional -import yaml from unstract.sdk.adapters.enums import AdapterTypes from unstract.sdk.file_storage import FileStorage, FileStorageProvider -from unstract.tool_registry.constants import AdapterPropertyKey, FeatureFlag, Tools +from unstract.tool_registry.constants import AdapterPropertyKey, Tools from unstract.tool_registry.dto import AdapterProperties, Spec, Tool, ToolMeta from unstract.tool_registry.exceptions import InvalidToolURLException, RegistryNotFound -from unstract.flags.feature_flag import check_feature_flag_status - logger = logging.getLogger(__name__) @@ -63,22 +60,14 @@ def save_tools_in_to_disk( data: dict[str, Any], fs: FileStorage = FileStorage(FileStorageProvider.LOCAL), ) -> None: - if check_feature_flag_status(FeatureFlag.REMOTE_FILE_STORAGE): - fs.json_dump(path=file_path, mode="w", encoding="utf-8", data=data) - else: - with open(file_path, "w") as json_file: - json.dump(data, json_file) + fs.json_dump(path=file_path, mode="w", encoding="utf-8", data=data) @staticmethod def get_all_tools_from_disk( file_path: str, fs: FileStorage = FileStorage(FileStorageProvider.LOCAL) ) -> dict[str, Any]: try: - if check_feature_flag_status(FeatureFlag.REMOTE_FILE_STORAGE): - data = fs.json_load(file_path) - else: - with open(file_path) as json_file: - data: dict[str, Any] = json.load(json_file) + data = fs.json_load(file_path) return data except json.JSONDecodeError as e: logger.warning(f"Error loading tools from {file_path}: {e}") @@ -90,11 +79,7 @@ def save_registry( data: dict[str, Any], fs: FileStorage = FileStorage(FileStorageProvider.LOCAL), ) -> None: - if check_feature_flag_status(FeatureFlag.REMOTE_FILE_STORAGE): - fs.yaml_dump(path=file_path, mode="w", encoding="utf-8", data=data) - else: - with open(file_path, "w") as file: - yaml.dump(data, file, default_flow_style=False) + fs.yaml_dump(path=file_path, mode="w", encoding="utf-8", data=data) @staticmethod def get_registry( @@ -114,16 +99,12 @@ def get_registry( yml_data: dict[str, Any] = {} try: logger.debug(f"Reading tool registry YAML: {file_path}") - if check_feature_flag_status(FeatureFlag.REMOTE_FILE_STORAGE): - yml_data = fs.yaml_load(file_path) - else: - with open(file_path) as file: - yml_data = yaml.safe_load(file) + yml_data = fs.yaml_load(file_path) + except FileNotFoundError: logger.warning(f"Could not find tool registry YAML: {str(file_path)}") if raise_exc: raise RegistryNotFound() - pass except Exception as error: logger.error(f"Error while loading {str(file_path)}: {error}") if raise_exc: diff --git a/unstract/workflow-execution/src/unstract/workflow_execution/execution_file_handler.py b/unstract/workflow-execution/src/unstract/workflow_execution/execution_file_handler.py index 2c699b94d..07c9c8812 100644 --- a/unstract/workflow-execution/src/unstract/workflow_execution/execution_file_handler.py +++ b/unstract/workflow-execution/src/unstract/workflow_execution/execution_file_handler.py @@ -4,9 +4,7 @@ from pathlib import Path from typing import Any, Optional -import fsspec from unstract.workflow_execution.constants import ( - FeatureFlag, MetaDataKey, ToolMetadataKey, ToolOutputType, @@ -16,10 +14,7 @@ from unstract.workflow_execution.exceptions import ToolMetadataNotFound from unstract.workflow_execution.tools_utils import ToolsUtils -from unstract.flags.feature_flag import check_feature_flag_status - -if check_feature_flag_status(FeatureFlag.REMOTE_FILE_STORAGE): - from unstract.filesystem import FileStorageType, FileSystem +from unstract.filesystem import FileStorageType, FileSystem logger = logging.getLogger(__name__) @@ -31,14 +26,9 @@ def __init__( self.organization_id = organization_id self.workflow_id = workflow_id self.execution_id = execution_id - if check_feature_flag_status(FeatureFlag.REMOTE_FILE_STORAGE): - self.execution_dir = self.get_execution_dir( - workflow_id, execution_id, organization_id - ) - else: - self.execution_dir = self.create_execution_dir_path( - workflow_id, execution_id, organization_id - ) + self.execution_dir = self.get_execution_dir( + workflow_id, execution_id, organization_id + ) self.source_file = os.path.join(self.execution_dir, WorkflowFileType.SOURCE) self.infile = os.path.join(self.execution_dir, WorkflowFileType.INFILE) self.metadata_file = os.path.join( @@ -51,14 +41,10 @@ def get_workflow_metadata(self) -> dict[str, Any]: Returns: dict[str, Any]: Workflow metadata. """ - if check_feature_flag_status(FeatureFlag.REMOTE_FILE_STORAGE): - file_system = FileSystem(FileStorageType.WORKFLOW_EXECUTION) - file_storage = file_system.get_file_storage() - metadata_content = file_storage.read(path=self.metadata_file, mode="r") - metadata = json.loads(metadata_content) - else: - with open(self.metadata_file) as file: - metadata: dict[str, Any] = json.load(file) + file_system = FileSystem(FileStorageType.WORKFLOW_EXECUTION) + file_storage = file_system.get_file_storage() + metadata_content = file_storage.read(path=self.metadata_file, mode="r") + metadata = json.loads(metadata_content) return metadata def get_list_of_tool_metadata( @@ -138,13 +124,9 @@ def add_metadata_to_volume( MetaDataKey.FILE_EXECUTION_ID: str(file_execution_id), MetaDataKey.TAGS: tags, } - if check_feature_flag_status(FeatureFlag.REMOTE_FILE_STORAGE): - file_system = FileSystem(FileStorageType.WORKFLOW_EXECUTION) - file_storage = file_system.get_file_storage() - file_storage.json_dump(path=metadata_path, data=content) - else: - with fsspec.open(f"file://{metadata_path}", "w") as local_file: - json.dump(content, local_file) + file_system = FileSystem(FileStorageType.WORKFLOW_EXECUTION) + file_storage = file_system.get_file_storage() + file_storage.json_dump(path=metadata_path, data=content) logger.info( f"metadata for {input_file_path} is " "added in to execution directory" From f2629ec89f71bcb951bc0b3cfad29b068c2aa7e4 Mon Sep 17 00:00:00 2001 From: gayathrivijayakumar Date: Tue, 28 Jan 2025 14:25:19 +0530 Subject: [PATCH 2/2] Remove feature flag condition --- backend/backend/constants.py | 1 - .../prompt_studio_core_v2/views.py | 33 ++++--------------- .../unstract/platform_service/constants.py | 7 ---- .../src/unstract/prompt_service/constants.py | 6 ---- runner/src/unstract/runner/constants.py | 6 ---- .../src/unstract/tool_registry/constants.py | 7 ---- .../unstract/workflow_execution/constants.py | 6 ---- 7 files changed, 7 insertions(+), 59 deletions(-) diff --git a/backend/backend/constants.py b/backend/backend/constants.py index 53126cebe..26d944d9a 100644 --- a/backend/backend/constants.py +++ b/backend/backend/constants.py @@ -34,4 +34,3 @@ class FeatureFlag: """Temporary feature flags.""" APP_DEPLOYMENT = "app_deployment" - REMOTE_FILE_STORAGE = "remote_file_storage" diff --git a/backend/prompt_studio/prompt_studio_core_v2/views.py b/backend/prompt_studio/prompt_studio_core_v2/views.py index 4f796590d..1e2eab26f 100644 --- a/backend/prompt_studio/prompt_studio_core_v2/views.py +++ b/backend/prompt_studio/prompt_studio_core_v2/views.py @@ -8,7 +8,6 @@ from django.http import HttpRequest from file_management.constants import FileInformationKey as FileKey from file_management.exceptions import FileNotFound -from file_management.file_management_helper import FileManagerHelper from permissions.permission import IsOwner, IsOwnerOrSharedUser from prompt_studio.processor_loader import get_plugin_class_by_name, load_plugins from prompt_studio.prompt_profile_manager_v2.constants import ( @@ -57,10 +56,6 @@ from utils.file_storage.helpers.prompt_studio_file_helper import PromptStudioFileHelper from utils.user_session import UserSessionUtils -from backend.constants import FeatureFlag -from unstract.connectors.filesystems.local_storage.local_storage import LocalStorageFS -from unstract.flags.feature_flag import check_feature_flag_status - from .models import CustomTool from .serializers import ( CustomToolSerializer, @@ -460,27 +455,13 @@ def upload_for_ide(self, request: HttpRequest, pk: Any = None) -> Response: logger.info( f"Uploading file: {file_name}" if file_name else "Uploading file" ) - if not check_feature_flag_status(flag_key=FeatureFlag.REMOTE_FILE_STORAGE): - file_path = FileManagerHelper.handle_sub_directory_for_tenants( - UserSessionUtils.get_organization_id(request), - is_create=True, - user_id=custom_tool.created_by.user_id, - tool_id=str(custom_tool.tool_id), - ) - file_system = LocalStorageFS(settings={"path": file_path}) - FileManagerHelper.upload_file( - file_system, - file_path, - file_data, - file_name, - ) - else: - PromptStudioFileHelper.upload_for_ide( - org_id=UserSessionUtils.get_organization_id(request), - user_id=custom_tool.created_by.user_id, - tool_id=str(custom_tool.tool_id), - uploaded_file=file_data, - ) + + PromptStudioFileHelper.upload_for_ide( + org_id=UserSessionUtils.get_organization_id(request), + user_id=custom_tool.created_by.user_id, + tool_id=str(custom_tool.tool_id), + uploaded_file=file_data, + ) # Create a record in the db for the file document = PromptStudioDocumentHelper.create( diff --git a/platform-service/src/unstract/platform_service/constants.py b/platform-service/src/unstract/platform_service/constants.py index 0b429c465..97635731a 100644 --- a/platform-service/src/unstract/platform_service/constants.py +++ b/platform-service/src/unstract/platform_service/constants.py @@ -1,10 +1,3 @@ -class FeatureFlag: - """Temporary feature flags.""" - - # For enabling remote storage feature - REMOTE_FILE_STORAGE = "remote_file_storage" - - class DBTable: """Database tables.""" diff --git a/prompt-service/src/unstract/prompt_service/constants.py b/prompt-service/src/unstract/prompt_service/constants.py index 88d33b08f..6aed9e689 100644 --- a/prompt-service/src/unstract/prompt_service/constants.py +++ b/prompt-service/src/unstract/prompt_service/constants.py @@ -91,12 +91,6 @@ class RunLevel(Enum): TABLE_EXTRACTION = "TABLE_EXTRACTION" -class FeatureFlag: - """Temporary feature flags.""" - - REMOTE_FILE_STORAGE = "remote_file_storage" - - class DBTableV2: """Database tables.""" diff --git a/runner/src/unstract/runner/constants.py b/runner/src/unstract/runner/constants.py index 6db1d28c4..e34b31279 100644 --- a/runner/src/unstract/runner/constants.py +++ b/runner/src/unstract/runner/constants.py @@ -35,9 +35,3 @@ class Env: ) EXECUTION_DATA_DIR = "EXECUTION_DATA_DIR" FLIPT_SERVICE_AVAILABLE = "FLIPT_SERVICE_AVAILABLE" - - -class FeatureFlag: - """Temporary feature flags.""" - - REMOTE_FILE_STORAGE = "remote_file_storage" diff --git a/unstract/tool-registry/src/unstract/tool_registry/constants.py b/unstract/tool-registry/src/unstract/tool_registry/constants.py index 405a40136..334fc7442 100644 --- a/unstract/tool-registry/src/unstract/tool_registry/constants.py +++ b/unstract/tool-registry/src/unstract/tool_registry/constants.py @@ -1,13 +1,6 @@ from typing import Any -class FeatureFlag: - """Temporary feature flags.""" - - # For enabling remote storage feature - REMOTE_FILE_STORAGE = "remote_file_storage" - - class Tools: TOOLS_DIRECTORY = "tools" IMAGE_LATEST_TAG = "latest" diff --git a/unstract/workflow-execution/src/unstract/workflow_execution/constants.py b/unstract/workflow-execution/src/unstract/workflow_execution/constants.py index f7e309c98..04dd880fc 100644 --- a/unstract/workflow-execution/src/unstract/workflow_execution/constants.py +++ b/unstract/workflow-execution/src/unstract/workflow_execution/constants.py @@ -55,9 +55,3 @@ class ToolMetadataKey: class ToolOutputType: TXT = "TXT" JSON = "JSON" - - -class FeatureFlag: - """Temporary feature flags.""" - - REMOTE_FILE_STORAGE = "remote_file_storage"