Skip to content

Commit

Permalink
changes in worker and tools related to sdk filestorage
Browse files Browse the repository at this point in the history
  • Loading branch information
muhammad-ali-e committed Nov 15, 2024
1 parent 7f92acd commit 5014209
Show file tree
Hide file tree
Showing 14 changed files with 250 additions and 31 deletions.
1 change: 1 addition & 0 deletions docker/dockerfiles/worker.Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ WORKDIR /app
COPY ${BUILD_CONTEXT_PATH} .
# Copy local dependency packages
COPY ${BUILD_PACKAGES_PATH}/core /unstract/core
COPY ${BUILD_PACKAGES_PATH}/flags /unstract/flags

RUN set -e; \
\
Expand Down
1 change: 1 addition & 0 deletions docker/dockerfiles/worker.Dockerfile.dockerignore
Original file line number Diff line number Diff line change
Expand Up @@ -60,5 +60,6 @@ prompt-service
tools
unstract
!unstract/core
!unstract/flags
!worker
x2text-service
4 changes: 4 additions & 0 deletions tools/classifier/sample.env
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,7 @@ TOOL_DATA_DIR=../data_dir

X2TEXT_HOST=http://unstract-x2text-service
X2TEXT_PORT=3004

EXECUTION_DATA_DIR=<execution_dir_path_with_bucket>
WORKFLOW_EXECUTION_FS_PROVIDER="minio"
WORKFLOW_EXECUTION_FS_CREDENTIAL={"endpoint_url":"http://localhost:9000","key":"","secret":""}
55 changes: 48 additions & 7 deletions tools/classifier/src/helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,14 +67,50 @@ def copy_source_to_output_bin(
"""
try:
output_folder_bin = Path(self.output_dir) / classification
if not output_folder_bin.is_dir():
output_folder_bin.mkdir(parents=True, exist_ok=True)

output_file = output_folder_bin / source_name
shutil.copyfile(source_file, output_file)
if hasattr(self.tool, "workflow_filestorage"):
output_file = output_folder_bin / source_name
self._copy_file(
source_fs=self.tool.workflow_filestorage,
destination_fs=self.tool.workflow_filestorage,
source_path=source_file,
destination_path=str(output_file),
)
else:
if not output_folder_bin.is_dir():
output_folder_bin.mkdir(parents=True, exist_ok=True)

output_file = output_folder_bin / source_name
shutil.copyfile(source_file, output_file)
except Exception as e:
self.tool.stream_error_and_exit(f"Error creating output file: {e}")

def _copy_file(
self,
source_fs: Any,
destination_fs: Any,
source_path: str,
destination_path: str,
) -> None:
"""Helps copy a file from source to destination.
Args:
src (str): Path to the source file
dest (str): Path to the destination file
"""
try:
# TODO: Move it to the top once SDK released with fileStorage Feature
# Change the source fs and destination fs type to to FileStorage
from unstract.sdk.utils import FileStorageUtils

FileStorageUtils.copy_file_to_destination(
source_storage=source_fs,
destination_storage=destination_fs,
source_path=source_path,
destination_paths=[destination_path],
)
except Exception as e:
self.stream_error_and_exit(f"Error copying file: {e}")

def extract_text(
self, file: str, text_extraction_adapter_id: Optional[str]
) -> Optional[str]:
Expand Down Expand Up @@ -127,8 +163,13 @@ def _extract_from_file(self, file: str) -> Optional[str]:
"""
self.tool.stream_log("Extracting text from file")
try:
with open(file, "rb") as f:
text = f.read().decode("utf-8")
if hasattr(self.tool, "workflow_filestorage"):
text = self.tool.workflow_filestorage.read(path=file, mode="rb").decode(
"utf-8"
)
else:
with open(file, "rb") as f:
text = f.read().decode("utf-8")
except Exception as e:
self.tool.stream_log(f"File error: {e}")
return None
Expand Down
4 changes: 4 additions & 0 deletions tools/structure/sample.env
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,7 @@ PROMPT_PORT=3003

X2TEXT_HOST=http://unstract-x2text-service
X2TEXT_PORT=3004

EXECUTION_DATA_DIR=<execution_dir_path_with_bucket>
WORKFLOW_EXECUTION_FS_PROVIDER="minio"
WORKFLOW_EXECUTION_FS_CREDENTIAL={"endpoint_url":"http://localhost:9000","key":"","secret":""}
27 changes: 23 additions & 4 deletions tools/structure/src/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ def run(
f"Function to higlight context is not found. {PAID_FEATURE_MSG}",
level=LogLevel.WARN,
)

workflow_filestorage = getattr(self, "workflow_filestorage", None)
if tool_settings[SettingsKeys.ENABLE_SINGLE_PASS_EXTRACTION]:
index.index(
tool_id=tool_id,
Expand All @@ -124,6 +124,11 @@ def run(
reindex=True,
usage_kwargs=usage_kwargs,
process_text=process_text,
**(
{"fs": workflow_filestorage}
if workflow_filestorage is not None
else {}
),
)
if summarize_as_source:
summarize_file_hash = self._summarize_and_index(
Expand Down Expand Up @@ -291,13 +296,23 @@ def _summarize_and_index(
summarize_file_path = tool_data_dir / SettingsKeys.SUMMARIZE

summarized_context = ""
if summarize_file_path.exists():
if hasattr(self, "workflow_filestorage"):
if self.workflow_filestorage.exists(summarize_file_path):
summarized_context = self.workflow_filestorage.read(
path=summarize_file_path, mode="r"
)
elif summarize_file_path.exists():
with open(summarize_file_path, encoding="utf-8") as f:
summarized_context = f.read()
if not summarized_context:
context = ""
with open(extract_file_path, encoding="utf-8") as file:
context = file.read()
if hasattr(self, "workflow_filestorage"):
context = self.workflow_filestorage.read(
path=extract_file_path, mode="r"
)
else:
with open(extract_file_path, encoding="utf-8") as file:
context = file.read()
prompt_keys = []
for output in outputs:
prompt_keys.append(output[SettingsKeys.NAME])
Expand Down Expand Up @@ -329,6 +344,7 @@ def _summarize_and_index(
summarize_file_hash: str = ToolUtils.get_hash_from_file(
file_path=summarize_file_path
)
workflow_filestorage = getattr(self, "workflow_filestorage", None)
index.index(
tool_id=tool_id,
embedding_instance_id=embedding_instance_id,
Expand All @@ -339,6 +355,9 @@ def _summarize_and_index(
chunk_size=0,
chunk_overlap=0,
usage_kwargs=usage_kwargs,
**(
{"fs": workflow_filestorage} if workflow_filestorage is not None else {}
),
)
return summarize_file_hash

Expand Down
4 changes: 4 additions & 0 deletions tools/text_extractor/sample.env
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,7 @@ TOOL_DATA_DIR=
# X2TEXT service
X2TEXT_HOST=
X2TEXT_PORT=

EXECUTION_DATA_DIR=<execution_dir_path_with_bucket>
WORKFLOW_EXECUTION_FS_PROVIDER="minio"
WORKFLOW_EXECUTION_FS_CREDENTIAL={"endpoint_url":"http://localhost:9000","key":"","secret":""}
9 changes: 7 additions & 2 deletions tools/text_extractor/src/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,13 @@ def run(
self.stream_log("Preparing to write the extracted text.")
if source_name:
output_path = Path(output_dir) / f"{Path(source_name).stem}.txt"
with open(output_path, "w", encoding="utf-8") as file:
file.write(extracted_text)
if hasattr(self, "workflow_filestorage"):
self.workflow_filestorage.write(
path=output_path, mode="w", data=extracted_text
)
else:
with open(output_path, "w", encoding="utf-8") as file:
file.write(extracted_text)

self.stream_log("Tool output written successfully.")
else:
Expand Down
113 changes: 111 additions & 2 deletions worker/pdm.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions worker/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ dependencies = [
"python-dotenv==1.0.0",
"redis==5.0.1",
"unstract-core @ file:///${PROJECT_ROOT}/../unstract/core",
"unstract-flags @ file:///${PROJECT_ROOT}/../unstract/flags",
]
requires-python = ">=3.9,<3.11.1"
readme = "README.md"
Expand Down
Loading

0 comments on commit 5014209

Please sign in to comment.