Skip to content

Commit

Permalink
changes in worker and tools related to sdk filestorage (#844)
Browse files Browse the repository at this point in the history
* changes in worker and tools related to sdk filestorage

* Commit pdm.lock changes

* updated sample envs with command and bumped the versions

* updated worker sample.env with FS and flipt envs

* Commit pdm.lock changes

* updated for plugin and tool sdk

* Update tools/structure/src/config/properties.json

Co-authored-by: Chandrasekharan M <[email protected]>
Signed-off-by: ali <[email protected]>

* Update tools/text_extractor/src/config/properties.json

Co-authored-by: Chandrasekharan M <[email protected]>
Signed-off-by: ali <[email protected]>

* Update tools/classifier/src/config/properties.json

Co-authored-by: Chandrasekharan M <[email protected]>
Signed-off-by: ali <[email protected]>

* resolve conflict with main

* Commit pdm.lock changes

---------

Signed-off-by: ali <[email protected]>
Co-authored-by: muhammad-ali-e <[email protected]>
Co-authored-by: Chandrasekharan M <[email protected]>
  • Loading branch information
3 people authored Nov 22, 2024
1 parent ec02804 commit c685aba
Show file tree
Hide file tree
Showing 18 changed files with 1,211 additions and 898 deletions.
1 change: 1 addition & 0 deletions docker/dockerfiles/worker.Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ WORKDIR /app
COPY ${BUILD_CONTEXT_PATH} .
# Copy local dependency packages
COPY ${BUILD_PACKAGES_PATH}/core /unstract/core
COPY ${BUILD_PACKAGES_PATH}/flags /unstract/flags

RUN set -e; \
\
Expand Down
1 change: 1 addition & 0 deletions docker/dockerfiles/worker.Dockerfile.dockerignore
Original file line number Diff line number Diff line change
Expand Up @@ -60,5 +60,6 @@ prompt-service
tools
unstract
!unstract/core
!unstract/flags
!worker
x2text-service
8 changes: 8 additions & 0 deletions tools/classifier/sample.env
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,11 @@ TOOL_DATA_DIR=../data_dir

X2TEXT_HOST=http://unstract-x2text-service
X2TEXT_PORT=3004

# File System Configuration for Workflow Execution
# Directory path for execution data storage
# (e.g., bucket/execution/org_id/workflow_id/execution_id)
EXECUTION_DATA_DIR=<execution_dir_path_with_bucket>
# Storage provider for Workflow Execution (e.g., minio, S3)
WORKFLOW_EXECUTION_FS_PROVIDER="minio"
WORKFLOW_EXECUTION_FS_CREDENTIAL={"endpoint_url":"http://localhost:9000","key":"","secret":""}
2 changes: 1 addition & 1 deletion tools/classifier/src/config/properties.json
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
"schemaVersion": "0.0.1",
"displayName": "File Classifier",
"functionName": "classify",
"toolVersion": "0.0.39",
"toolVersion": "0.0.40",
"description": "Classifies a file into a bin based on its contents",
"input": {
"description": "File to be classified"
Expand Down
55 changes: 48 additions & 7 deletions tools/classifier/src/helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,14 +67,50 @@ def copy_source_to_output_bin(
"""
try:
output_folder_bin = Path(self.output_dir) / classification
if not output_folder_bin.is_dir():
output_folder_bin.mkdir(parents=True, exist_ok=True)

output_file = output_folder_bin / source_name
shutil.copyfile(source_file, output_file)
if hasattr(self.tool, "workflow_filestorage"):
output_file = output_folder_bin / source_name
self._copy_file(
source_fs=self.tool.workflow_filestorage,
destination_fs=self.tool.workflow_filestorage,
source_path=source_file,
destination_path=str(output_file),
)
else:
if not output_folder_bin.is_dir():
output_folder_bin.mkdir(parents=True, exist_ok=True)

output_file = output_folder_bin / source_name
shutil.copyfile(source_file, output_file)
except Exception as e:
self.tool.stream_error_and_exit(f"Error creating output file: {e}")

def _copy_file(
self,
source_fs: Any,
destination_fs: Any,
source_path: str,
destination_path: str,
) -> None:
"""Helps copy a file from source to destination.
Args:
src (str): Path to the source file
dest (str): Path to the destination file
"""
try:
# TODO: Move it to the top once SDK released with fileStorage Feature
# Change the source fs and destination fs type to to FileStorage
from unstract.sdk.utils import FileStorageUtils

FileStorageUtils.copy_file_to_destination(
source_storage=source_fs,
destination_storage=destination_fs,
source_path=source_path,
destination_paths=[destination_path],
)
except Exception as e:
self.stream_error_and_exit(f"Error copying file: {e}")

def extract_text(
self, file: str, text_extraction_adapter_id: Optional[str]
) -> Optional[str]:
Expand Down Expand Up @@ -127,8 +163,13 @@ def _extract_from_file(self, file: str) -> Optional[str]:
"""
self.tool.stream_log("Extracting text from file")
try:
with open(file, "rb") as f:
text = f.read().decode("utf-8")
if hasattr(self.tool, "workflow_filestorage"):
text = self.tool.workflow_filestorage.read(path=file, mode="rb").decode(
"utf-8"
)
else:
with open(file, "rb") as f:
text = f.read().decode("utf-8")
except Exception as e:
self.tool.stream_log(f"File error: {e}")
return None
Expand Down
8 changes: 8 additions & 0 deletions tools/structure/sample.env
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,11 @@ PROMPT_PORT=3003

X2TEXT_HOST=http://unstract-x2text-service
X2TEXT_PORT=3004

# File System Configuration for Workflow Execution
# Directory path for execution data storage
# (e.g., bucket/execution/org_id/workflow_id/execution_id)
EXECUTION_DATA_DIR=<execution_dir_path_with_bucket>
# Storage provider for Workflow Execution (e.g., minio, S3)
WORKFLOW_EXECUTION_FS_PROVIDER="minio"
WORKFLOW_EXECUTION_FS_CREDENTIAL={"endpoint_url":"http://localhost:9000","key":"","secret":""}
2 changes: 1 addition & 1 deletion tools/structure/src/config/properties.json
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
"schemaVersion": "0.0.1",
"displayName": "Structure Tool",
"functionName": "structure_tool",
"toolVersion": "0.0.49",
"toolVersion": "0.0.50",
"description": "This is a template tool which can answer set of input prompts designed in the Prompt Studio",
"input": {
"description": "File that needs to be indexed and parsed for answers"
Expand Down
27 changes: 23 additions & 4 deletions tools/structure/src/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ def run(
f"Function to higlight context is not found. {PAID_FEATURE_MSG}",
level=LogLevel.WARN,
)

workflow_filestorage = getattr(self, "workflow_filestorage", None)
if tool_settings[SettingsKeys.ENABLE_SINGLE_PASS_EXTRACTION]:
index.index(
tool_id=tool_id,
Expand All @@ -143,6 +143,11 @@ def run(
reindex=True,
usage_kwargs=usage_kwargs,
process_text=process_text,
**(
{"fs": workflow_filestorage}
if workflow_filestorage is not None
else {}
),
)
if summarize_as_source:
summarize_file_hash = self._summarize_and_index(
Expand Down Expand Up @@ -305,13 +310,23 @@ def _summarize_and_index(
summarize_file_path = tool_data_dir / SettingsKeys.SUMMARIZE

summarized_context = ""
if summarize_file_path.exists():
if hasattr(self, "workflow_filestorage"):
if self.workflow_filestorage.exists(summarize_file_path):
summarized_context = self.workflow_filestorage.read(
path=summarize_file_path, mode="r"
)
elif summarize_file_path.exists():
with open(summarize_file_path, encoding="utf-8") as f:
summarized_context = f.read()
if not summarized_context:
context = ""
with open(extract_file_path, encoding="utf-8") as file:
context = file.read()
if hasattr(self, "workflow_filestorage"):
context = self.workflow_filestorage.read(
path=extract_file_path, mode="r"
)
else:
with open(extract_file_path, encoding="utf-8") as file:
context = file.read()
prompt_keys = []
for output in outputs:
prompt_keys.append(output[SettingsKeys.NAME])
Expand Down Expand Up @@ -343,6 +358,7 @@ def _summarize_and_index(
summarize_file_hash: str = ToolUtils.get_hash_from_file(
file_path=summarize_file_path
)
workflow_filestorage = getattr(self, "workflow_filestorage", None)
index.index(
tool_id=tool_id,
embedding_instance_id=embedding_instance_id,
Expand All @@ -353,6 +369,9 @@ def _summarize_and_index(
chunk_size=0,
chunk_overlap=0,
usage_kwargs=usage_kwargs,
**(
{"fs": workflow_filestorage} if workflow_filestorage is not None else {}
),
)
return summarize_file_hash

Expand Down
8 changes: 8 additions & 0 deletions tools/text_extractor/sample.env
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,11 @@ TOOL_DATA_DIR=
# X2TEXT service
X2TEXT_HOST=
X2TEXT_PORT=

# File System Configuration for Workflow Execution
# Directory path for execution data storage
# (e.g., bucket/execution/org_id/workflow_id/execution_id)
EXECUTION_DATA_DIR=<execution_dir_path_with_bucket>
# Storage provider for Workflow Execution (e.g., minio, S3)
WORKFLOW_EXECUTION_FS_PROVIDER="minio"
WORKFLOW_EXECUTION_FS_CREDENTIAL={"endpoint_url":"http://localhost:9000","key":"","secret":""}
2 changes: 1 addition & 1 deletion tools/text_extractor/src/config/properties.json
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
"schemaVersion": "0.0.1",
"displayName": "Text Extractor",
"functionName": "text_extractor",
"toolVersion": "0.0.37",
"toolVersion": "0.0.38",
"description": "The Text Extractor is a powerful tool designed to convert documents to its text form or Extract texts from documents",
"input": {
"description": "Document"
Expand Down
9 changes: 7 additions & 2 deletions tools/text_extractor/src/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,13 @@ def run(
self.stream_log("Preparing to write the extracted text.")
if source_name:
output_path = Path(output_dir) / f"{Path(source_name).stem}.txt"
with open(output_path, "w", encoding="utf-8") as file:
file.write(extracted_text)
if hasattr(self, "workflow_filestorage"):
self.workflow_filestorage.write(
path=output_path, mode="w", data=extracted_text
)
else:
with open(output_path, "w", encoding="utf-8") as file:
file.write(extracted_text)

self.stream_log("Tool output written successfully.")
else:
Expand Down
Loading

0 comments on commit c685aba

Please sign in to comment.