Skip to content

Commit

Permalink
Merge branch 'main' into feat/remote-storage-prompt-studio
Browse files Browse the repository at this point in the history
Signed-off-by: harini-venkataraman <[email protected]>
  • Loading branch information
harini-venkataraman authored Nov 26, 2024
2 parents 6aee781 + c4a6593 commit fe493cd
Show file tree
Hide file tree
Showing 54 changed files with 2,072 additions and 1,270 deletions.
4 changes: 1 addition & 3 deletions backend/api_v2/deployment_helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -173,9 +173,7 @@ def execute_workflow(
result.status_api = DeploymentHelper.construct_status_endpoint(
api_endpoint=api.api_endpoint, execution_id=execution_id
)
if include_metadata:
result.remove_result_metadata_keys(keys_to_remove=["highlight_data"])
else:
if not include_metadata:
result.remove_result_metadata_keys()
except Exception as error:
DestinationConnector.delete_api_storage_dir(
Expand Down
1 change: 1 addition & 0 deletions backend/backend/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,3 +34,4 @@ class FeatureFlag:
"""Temporary feature flags."""

APP_DEPLOYMENT = "app_deployment"
REMOTE_FILE_STORAGE = "remote_file_storage"
176 changes: 87 additions & 89 deletions backend/pdm.lock

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions backend/prompt_studio/prompt_studio_core_v2/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ class ToolStudioPromptKeys:
SUMMARIZE_AS_SOURCE = "summarize_as_source"
VARIABLE_MAP = "variable_map"
RECORD = "record"
FILE_PATH = "file_path"
ENABLE_HIGHLIGHT = "enable_highlight"


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@
from unstract.sdk.constants import LogLevel
from unstract.sdk.tool.stream import StreamMixin

from backend.constants import FeatureFlag
from unstract.flags.feature_flag import check_feature_flag_status


class PromptIdeBaseTool(StreamMixin):
def __init__(self, log_level: LogLevel = LogLevel.INFO, org_id: str = "") -> None:
Expand All @@ -16,6 +19,13 @@ def __init__(self, log_level: LogLevel = LogLevel.INFO, org_id: str = "") -> Non
"""
self.log_level = log_level
self.org_id = org_id
self.workflow_filestorage = None
if check_feature_flag_status(FeatureFlag.REMOTE_FILE_STORAGE):
from unstract.filesystem import FileStorageType, FileSystem

file_system = FileSystem(FileStorageType.WORKFLOW_EXECUTION)
self.workflow_filestorage = file_system.get_file_storage()

super().__init__(log_level=log_level)

def get_env_or_die(self, env_key: str) -> str:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -859,6 +859,7 @@ def _fetch_response(
TSPKeys.RUN_ID: run_id,
TSPKeys.FILE_NAME: doc_name,
TSPKeys.FILE_HASH: file_hash,
TSPKeys.FILE_PATH: doc_path,
Common.LOG_EVENTS_ID: StateStore.get(Common.LOG_EVENTS_ID),
}

Expand Down
2 changes: 1 addition & 1 deletion backend/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ dependencies = [
"python-socketio==5.9.0", # For log_events
"social-auth-app-django==5.3.0", # For OAuth
"social-auth-core==4.4.2", # For OAuth
"unstract-sdk~=0.54.0rc1",
"unstract-sdk~=0.54.0rc2",
# ! IMPORTANT!
# Indirect local dependencies usually need to be added in their own projects
# as: https://pdm-project.org/latest/usage/dependency/#local-dependencies.
Expand Down
23 changes: 21 additions & 2 deletions backend/sample.env
Original file line number Diff line number Diff line change
Expand Up @@ -82,9 +82,9 @@ REMOTE_PROMPT_STUDIO_FILE_PATH=

# Structure Tool Image (Runs prompt studio exported tools)
# https://hub.docker.com/r/unstract/tool-structure
STRUCTURE_TOOL_IMAGE_URL="docker:unstract/tool-structure:0.0.48"
STRUCTURE_TOOL_IMAGE_URL="docker:unstract/tool-structure:0.0.49"
STRUCTURE_TOOL_IMAGE_NAME="unstract/tool-structure"
STRUCTURE_TOOL_IMAGE_TAG="0.0.48"
STRUCTURE_TOOL_IMAGE_TAG="0.0.49"

# Feature Flags
EVALUATION_SERVER_IP=unstract-flipt
Expand Down Expand Up @@ -145,3 +145,22 @@ TOOL_REGISTRY_CONFIG_PATH="/data/tool_registry_config"

# Flipt Service
FLIPT_SERVICE_AVAILABLE=False


# File System Configuration for Workflow and API Execution

# Directory Prefixes for storing execution files
WORKFLOW_EXECUTION_DIR_PREFIX="unstract/execution"
API_EXECUTION_DIR_PREFIX="unstract/api"

# Storage Provider for Workflow Execution
# Valid options: MINIO, S3, etc..
WORKFLOW_EXECUTION_FS_PROVIDER="MINIO"
WORKFLOW_EXECUTION_FS_CREDENTIAL='{"endpoint_url": "", "key": "", "secret": ""}'

# Storage Provider for API Execution
API_STORAGE_FS_PROVIDER="MINIO"
API_STORAGE_FS_CREDENTIAL='{"endpoint_url": "", "key": "", "secret": ""}'

# Optional: Legacy storage path (if applicable)
LEGACY_STORAGE_PATH="/path/to/legacy/storage"
3 changes: 0 additions & 3 deletions backend/utils/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,6 @@ class FeatureFlag:

REMOTE_FILE_STORAGE = "remote_file_storage"

pass


class Common:
METADATA = "metadata"
MODULE = "module"
Expand Down
39 changes: 28 additions & 11 deletions backend/workflow_manager/endpoint_v2/base_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,13 @@
from utils.constants import Common
from utils.user_context import UserContext

from backend.constants import FeatureFlag
from unstract.connectors.filesystems import connectors
from unstract.connectors.filesystems.unstract_file_system import UnstractFileSystem
from unstract.flags.feature_flag import check_feature_flag_status

if check_feature_flag_status(FeatureFlag.REMOTE_FILE_STORAGE):
from unstract.filesystem import FileStorageType, FileSystem


class BaseConnector(ExecutionFileHandler):
Expand All @@ -22,13 +27,14 @@ def __init__(
This class serves as a base for connectors and provides common
utilities.
"""
if not (settings.API_STORAGE_DIR and settings.WORKFLOW_DATA_DIR):
raise ValueError("Missed env API_STORAGE_DIR or WORKFLOW_DATA_DIR")
super().__init__(workflow_id, execution_id, organization_id)
# Directory path for storing execution-related files for API
self.api_storage_dir: str = self.create_execution_dir_path(
workflow_id, execution_id, organization_id, settings.API_STORAGE_DIR
)
if not check_feature_flag_status(FeatureFlag.REMOTE_FILE_STORAGE):
if not (settings.API_STORAGE_DIR and settings.WORKFLOW_DATA_DIR):
raise ValueError("Missed env API_STORAGE_DIR or WORKFLOW_DATA_DIR")
# Directory path for storing execution-related files for API
self.api_storage_dir: str = self.create_execution_dir_path(
workflow_id, execution_id, organization_id, settings.API_STORAGE_DIR
)

def get_fsspec(
self, settings: dict[str, Any], connector_id: str
Expand Down Expand Up @@ -80,8 +86,14 @@ def get_json_schema(cls, file_path: str) -> dict[str, Any]:
json.JSONDecodeError: If there is an issue decoding the JSON file.
"""
try:
with open(file_path, encoding="utf-8") as file:
schema: dict[str, Any] = json.load(file)
if check_feature_flag_status(FeatureFlag.REMOTE_FILE_STORAGE):
file_system = FileSystem(FileStorageType.WORKFLOW_EXECUTION)
file_storage = file_system.get_file_storage()
file_contents = file_storage.read(path=file_path, encoding="utf-8")
schema: dict[str, Any] = json.load(file_contents)
else:
with open(file_path, encoding="utf-8") as file:
schema: dict[str, Any] = json.load(file)
except OSError:
schema = {}
return schema
Expand All @@ -100,7 +112,12 @@ def get_api_storage_dir_path(cls, workflow_id: str, execution_id: str) -> str:
str: The directory path for the execution.
"""
organization_id = UserContext.get_organization_identifier()
api_storage_dir: str = cls.create_execution_dir_path(
workflow_id, execution_id, organization_id, settings.API_STORAGE_DIR
)
if check_feature_flag_status(FeatureFlag.REMOTE_FILE_STORAGE):
api_storage_dir: str = cls.get_api_execution_dir(
workflow_id, execution_id, organization_id
)
else:
api_storage_dir: str = cls.create_execution_dir_path(
workflow_id, execution_id, organization_id, settings.API_STORAGE_DIR
)
return api_storage_dir
63 changes: 59 additions & 4 deletions backend/workflow_manager/endpoint_v2/destination.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from connector_v2.models import ConnectorInstance
from fsspec.implementations.local import LocalFileSystem
from unstract.sdk.constants import ToolExecKey
from unstract.sdk.tool.mime_types import EXT_MIME_MAP
from unstract.workflow_execution.constants import ToolOutputType
from utils.user_context import UserContext
from workflow_manager.endpoint_v2.base_connector import BaseConnector
Expand All @@ -36,8 +37,13 @@
from workflow_manager.workflow_v2.models.file_history import FileHistory
from workflow_manager.workflow_v2.models.workflow import Workflow

from backend.constants import FeatureFlag
from backend.exceptions import UnstractFSException
from unstract.connectors.exceptions import ConnectorError
from unstract.flags.feature_flag import check_feature_flag_status

if check_feature_flag_status(FeatureFlag.REMOTE_FILE_STORAGE):
from unstract.filesystem import FileStorageType, FileSystem

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -405,6 +411,8 @@ def get_result(self, file_history: Optional[FileHistory] = None) -> Optional[Any
Returns:
Union[dict[str, Any], str]: Result data.
"""
if check_feature_flag_status(FeatureFlag.REMOTE_FILE_STORAGE):
return self.get_result_with_file_storage(file_history=file_history)
if file_history and file_history.result:
return self.parse_string(file_history.result)
output_file = os.path.join(self.execution_dir, WorkflowFileType.INFILE)
Expand Down Expand Up @@ -434,6 +442,43 @@ def get_result(self, file_history: Optional[FileHistory] = None) -> Optional[Any
logger.error(f"Error while getting result {err}")
return result

def get_result_with_file_storage(
self, file_history: Optional[FileHistory] = None
) -> Optional[Any]:
"""Get result data from the output file.
Returns:
Union[dict[str, Any], str]: Result data.
"""
if file_history and file_history.result:
return self.parse_string(file_history.result)
output_file = os.path.join(self.execution_dir, WorkflowFileType.INFILE)
metadata: dict[str, Any] = self.get_workflow_metadata()
output_type = self.get_output_type(metadata)
result: Union[dict[str, Any], str] = ""
file_system = FileSystem(FileStorageType.WORKFLOW_EXECUTION)
file_storage = file_system.get_file_storage()
try:
# TODO: SDK handles validation; consider removing here.
file_type = file_storage.mime_type(output_file)
if output_type == ToolOutputType.JSON:
if file_type != EXT_MIME_MAP[ToolOutputType.JSON.lower()]:
logger.error(f"Output type json mismatched {file_type}")
raise ToolOutputTypeMismatch()
file_content = file_storage.read(output_file, mode="r")
result = json.loads(file_content)
elif output_type == ToolOutputType.TXT:
if file_type == EXT_MIME_MAP[ToolOutputType.JSON.lower()]:
logger.error(f"Output type txt mismatched {file_type}")
raise ToolOutputTypeMismatch()
file_content = file_storage.read(output_file, mode="r")
result = file_content.encode("utf-8").decode("unicode-escape")
else:
raise InvalidToolOutputType()
except (FileNotFoundError, json.JSONDecodeError) as err:
logger.error(f"Error while getting result {err}")
return result

def get_metadata(
self, file_history: Optional[FileHistory] = None
) -> Optional[dict[str, Any]]:
Expand All @@ -454,8 +499,13 @@ def delete_execution_directory(self) -> None:
Returns:
None
"""
fs: LocalFileSystem = fsspec.filesystem("file")
fs.rm(self.execution_dir, recursive=True)
if check_feature_flag_status(FeatureFlag.REMOTE_FILE_STORAGE):
file_system = FileSystem(FileStorageType.WORKFLOW_EXECUTION)
file_storage = file_system.get_file_storage()
file_storage.rm(self.execution_dir, recursive=True)
else:
fs: LocalFileSystem = fsspec.filesystem("file")
fs.rm(self.execution_dir, recursive=True)
self.delete_api_storage_dir(self.workflow_id, self.execution_id)

@classmethod
Expand All @@ -468,8 +518,13 @@ def delete_api_storage_dir(cls, workflow_id: str, execution_id: str) -> None:
api_storage_dir = cls.get_api_storage_dir_path(
workflow_id=workflow_id, execution_id=execution_id
)
fs: LocalFileSystem = fsspec.filesystem("file")
fs.rm(api_storage_dir, recursive=True)
if check_feature_flag_status(FeatureFlag.REMOTE_FILE_STORAGE):
file_system = FileSystem(FileStorageType.API_EXECUTION)
file_storage = file_system.get_file_storage()
file_storage.rm(api_storage_dir, recursive=True)
else:
fs: LocalFileSystem = fsspec.filesystem("file")
fs.rm(api_storage_dir, recursive=True)

@classmethod
def create_endpoint_for_workflow(
Expand Down
Loading

0 comments on commit fe493cd

Please sign in to comment.