Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

changes in worker and tools related to sdk filestorage #844

Merged
merged 12 commits into from
Nov 22, 2024
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docker/dockerfiles/worker.Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ WORKDIR /app
COPY ${BUILD_CONTEXT_PATH} .
# Copy local dependency packages
COPY ${BUILD_PACKAGES_PATH}/core /unstract/core
COPY ${BUILD_PACKAGES_PATH}/flags /unstract/flags

RUN set -e; \
\
Expand Down
1 change: 1 addition & 0 deletions docker/dockerfiles/worker.Dockerfile.dockerignore
Original file line number Diff line number Diff line change
Expand Up @@ -60,5 +60,6 @@ prompt-service
tools
unstract
!unstract/core
!unstract/flags
!worker
x2text-service
8 changes: 8 additions & 0 deletions tools/classifier/sample.env
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,11 @@ TOOL_DATA_DIR=../data_dir

X2TEXT_HOST=http://unstract-x2text-service
X2TEXT_PORT=3004

# File System Configuration for Workflow Execution
# Directory path for execution data storage
# (e.g., bucket/execution/org_id/workflow_id/execution_id)
EXECUTION_DATA_DIR=<execution_dir_path_with_bucket>
chandrasekharan-zipstack marked this conversation as resolved.
Show resolved Hide resolved
# Storage provider for Workflow Execution (e.g., minio, S3)
WORKFLOW_EXECUTION_FS_PROVIDER="minio"
WORKFLOW_EXECUTION_FS_CREDENTIAL={"endpoint_url":"http://localhost:9000","key":"","secret":""}
2 changes: 1 addition & 1 deletion tools/classifier/src/config/properties.json
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
"schemaVersion": "0.0.1",
"displayName": "File Classifier",
"functionName": "classify",
"toolVersion": "0.0.38",
"toolVersion": "0.0.39",
muhammad-ali-e marked this conversation as resolved.
Show resolved Hide resolved
"description": "Classifies a file into a bin based on its contents",
"input": {
"description": "File to be classified"
Expand Down
55 changes: 48 additions & 7 deletions tools/classifier/src/helper.py
muhammad-ali-e marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
Expand Up @@ -67,14 +67,50 @@ def copy_source_to_output_bin(
"""
try:
output_folder_bin = Path(self.output_dir) / classification
if not output_folder_bin.is_dir():
output_folder_bin.mkdir(parents=True, exist_ok=True)

output_file = output_folder_bin / source_name
shutil.copyfile(source_file, output_file)
if hasattr(self.tool, "workflow_filestorage"):
output_file = output_folder_bin / source_name
self._copy_file(
source_fs=self.tool.workflow_filestorage,
destination_fs=self.tool.workflow_filestorage,
source_path=source_file,
destination_path=str(output_file),
)
else:
if not output_folder_bin.is_dir():
output_folder_bin.mkdir(parents=True, exist_ok=True)

output_file = output_folder_bin / source_name
shutil.copyfile(source_file, output_file)
except Exception as e:
self.tool.stream_error_and_exit(f"Error creating output file: {e}")

def _copy_file(
self,
source_fs: Any,
destination_fs: Any,
source_path: str,
destination_path: str,
) -> None:
"""Helps copy a file from source to destination.

Args:
src (str): Path to the source file
dest (str): Path to the destination file
"""
try:
# TODO: Move it to the top once SDK released with fileStorage Feature
# Change the source fs and destination fs type to to FileStorage
from unstract.sdk.utils import FileStorageUtils

FileStorageUtils.copy_file_to_destination(
gaya3-zipstack marked this conversation as resolved.
Show resolved Hide resolved
source_storage=source_fs,
destination_storage=destination_fs,
source_path=source_path,
destination_paths=[destination_path],
)
except Exception as e:
self.stream_error_and_exit(f"Error copying file: {e}")

def extract_text(
self, file: str, text_extraction_adapter_id: Optional[str]
) -> Optional[str]:
Expand Down Expand Up @@ -127,8 +163,13 @@ def _extract_from_file(self, file: str) -> Optional[str]:
"""
self.tool.stream_log("Extracting text from file")
try:
with open(file, "rb") as f:
text = f.read().decode("utf-8")
if hasattr(self.tool, "workflow_filestorage"):
text = self.tool.workflow_filestorage.read(path=file, mode="rb").decode(
"utf-8"
)
else:
with open(file, "rb") as f:
text = f.read().decode("utf-8")
except Exception as e:
self.tool.stream_log(f"File error: {e}")
return None
Expand Down
8 changes: 8 additions & 0 deletions tools/structure/sample.env
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,11 @@ PROMPT_PORT=3003

X2TEXT_HOST=http://unstract-x2text-service
X2TEXT_PORT=3004

# File System Configuration for Workflow Execution
# Directory path for execution data storage
# (e.g., bucket/execution/org_id/workflow_id/execution_id)
EXECUTION_DATA_DIR=<execution_dir_path_with_bucket>
# Storage provider for Workflow Execution (e.g., minio, S3)
WORKFLOW_EXECUTION_FS_PROVIDER="minio"
WORKFLOW_EXECUTION_FS_CREDENTIAL={"endpoint_url":"http://localhost:9000","key":"","secret":""}
2 changes: 1 addition & 1 deletion tools/structure/src/config/properties.json
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
"schemaVersion": "0.0.1",
"displayName": "Structure Tool",
"functionName": "structure_tool",
"toolVersion": "0.0.46",
"toolVersion": "0.0.48",
muhammad-ali-e marked this conversation as resolved.
Show resolved Hide resolved
"description": "This is a template tool which can answer set of input prompts designed in the Prompt Studio",
"input": {
"description": "File that needs to be indexed and parsed for answers"
Expand Down
34 changes: 29 additions & 5 deletions tools/structure/src/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ def run(
f"Function to higlight context is not found. {PAID_FEATURE_MSG}",
level=LogLevel.WARN,
)

workflow_filestorage = getattr(self, "workflow_filestorage", None)
if tool_settings[SettingsKeys.ENABLE_SINGLE_PASS_EXTRACTION]:
index.index(
tool_id=tool_id,
Expand All @@ -124,6 +124,11 @@ def run(
reindex=True,
usage_kwargs=usage_kwargs,
process_text=process_text,
**(
{"fs": workflow_filestorage}
if workflow_filestorage is not None
else {}
),
)
if summarize_as_source:
summarize_file_hash = self._summarize_and_index(
Expand Down Expand Up @@ -221,7 +226,12 @@ def run(
transform_dict,
)

highlight_data = transform_dict(epilogue, tool_data_dir)
if hasattr(self, "workflow_filestorage"):
highlight_data = transform_dict(
epilogue, tool_data_dir, self.workflow_filestorage
)
else:
highlight_data = transform_dict(epilogue, tool_data_dir)
metadata[SettingsKeys.HIGHLIGHT_DATA] = highlight_data
metadata[SettingsKeys.CONFIDENCE_DATA] = get_confidence_data(
epilogue, tool_data_dir
Expand Down Expand Up @@ -291,13 +301,23 @@ def _summarize_and_index(
summarize_file_path = tool_data_dir / SettingsKeys.SUMMARIZE

summarized_context = ""
if summarize_file_path.exists():
if hasattr(self, "workflow_filestorage"):
if self.workflow_filestorage.exists(summarize_file_path):
summarized_context = self.workflow_filestorage.read(
path=summarize_file_path, mode="r"
)
elif summarize_file_path.exists():
with open(summarize_file_path, encoding="utf-8") as f:
summarized_context = f.read()
if not summarized_context:
context = ""
with open(extract_file_path, encoding="utf-8") as file:
context = file.read()
if hasattr(self, "workflow_filestorage"):
context = self.workflow_filestorage.read(
path=extract_file_path, mode="r"
)
else:
with open(extract_file_path, encoding="utf-8") as file:
context = file.read()
prompt_keys = []
for output in outputs:
prompt_keys.append(output[SettingsKeys.NAME])
Expand Down Expand Up @@ -329,6 +349,7 @@ def _summarize_and_index(
summarize_file_hash: str = ToolUtils.get_hash_from_file(
file_path=summarize_file_path
)
workflow_filestorage = getattr(self, "workflow_filestorage", None)
index.index(
tool_id=tool_id,
embedding_instance_id=embedding_instance_id,
Expand All @@ -339,6 +360,9 @@ def _summarize_and_index(
chunk_size=0,
chunk_overlap=0,
usage_kwargs=usage_kwargs,
**(
{"fs": workflow_filestorage} if workflow_filestorage is not None else {}
),
)
return summarize_file_hash

Expand Down
8 changes: 8 additions & 0 deletions tools/text_extractor/sample.env
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,11 @@ TOOL_DATA_DIR=
# X2TEXT service
X2TEXT_HOST=
X2TEXT_PORT=

# File System Configuration for Workflow Execution
# Directory path for execution data storage
# (e.g., bucket/execution/org_id/workflow_id/execution_id)
EXECUTION_DATA_DIR=<execution_dir_path_with_bucket>
# Storage provider for Workflow Execution (e.g., minio, S3)
WORKFLOW_EXECUTION_FS_PROVIDER="minio"
WORKFLOW_EXECUTION_FS_CREDENTIAL={"endpoint_url":"http://localhost:9000","key":"","secret":""}
2 changes: 1 addition & 1 deletion tools/text_extractor/src/config/properties.json
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
"schemaVersion": "0.0.1",
"displayName": "Text Extractor",
"functionName": "text_extractor",
"toolVersion": "0.0.36",
"toolVersion": "0.0.37",
muhammad-ali-e marked this conversation as resolved.
Show resolved Hide resolved
"description": "The Text Extractor is a powerful tool designed to convert documents to its text form or Extract texts from documents",
"input": {
"description": "Document"
Expand Down
9 changes: 7 additions & 2 deletions tools/text_extractor/src/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,13 @@ def run(
self.stream_log("Preparing to write the extracted text.")
if source_name:
output_path = Path(output_dir) / f"{Path(source_name).stem}.txt"
with open(output_path, "w", encoding="utf-8") as file:
file.write(extracted_text)
if hasattr(self, "workflow_filestorage"):
self.workflow_filestorage.write(
path=output_path, mode="w", data=extracted_text
)
else:
with open(output_path, "w", encoding="utf-8") as file:
file.write(extracted_text)

self.stream_log("Tool output written successfully.")
else:
Expand Down
Loading