Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

changes in worker and tools related to sdk filestorage #844

Merged
merged 12 commits into from
Nov 22, 2024
Merged
1 change: 1 addition & 0 deletions docker/dockerfiles/worker.Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ WORKDIR /app
COPY ${BUILD_CONTEXT_PATH} .
# Copy local dependency packages
COPY ${BUILD_PACKAGES_PATH}/core /unstract/core
COPY ${BUILD_PACKAGES_PATH}/flags /unstract/flags

RUN set -e; \
\
Expand Down
1 change: 1 addition & 0 deletions docker/dockerfiles/worker.Dockerfile.dockerignore
Original file line number Diff line number Diff line change
Expand Up @@ -60,5 +60,6 @@ prompt-service
tools
unstract
!unstract/core
!unstract/flags
!worker
x2text-service
4 changes: 4 additions & 0 deletions tools/classifier/sample.env
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,7 @@ TOOL_DATA_DIR=../data_dir

X2TEXT_HOST=http://unstract-x2text-service
X2TEXT_PORT=3004

EXECUTION_DATA_DIR=<execution_dir_path_with_bucket>
chandrasekharan-zipstack marked this conversation as resolved.
Show resolved Hide resolved
WORKFLOW_EXECUTION_FS_PROVIDER="minio"
WORKFLOW_EXECUTION_FS_CREDENTIAL={"endpoint_url":"http://localhost:9000","key":"","secret":""}
55 changes: 48 additions & 7 deletions tools/classifier/src/helper.py
muhammad-ali-e marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
Expand Up @@ -67,14 +67,50 @@ def copy_source_to_output_bin(
"""
try:
output_folder_bin = Path(self.output_dir) / classification
if not output_folder_bin.is_dir():
output_folder_bin.mkdir(parents=True, exist_ok=True)

output_file = output_folder_bin / source_name
shutil.copyfile(source_file, output_file)
if hasattr(self.tool, "workflow_filestorage"):
output_file = output_folder_bin / source_name
self._copy_file(
source_fs=self.tool.workflow_filestorage,
destination_fs=self.tool.workflow_filestorage,
source_path=source_file,
destination_path=str(output_file),
)
else:
if not output_folder_bin.is_dir():
output_folder_bin.mkdir(parents=True, exist_ok=True)

output_file = output_folder_bin / source_name
shutil.copyfile(source_file, output_file)
except Exception as e:
self.tool.stream_error_and_exit(f"Error creating output file: {e}")

def _copy_file(
self,
source_fs: Any,
destination_fs: Any,
source_path: str,
destination_path: str,
) -> None:
"""Helps copy a file from source to destination.

Args:
src (str): Path to the source file
dest (str): Path to the destination file
"""
try:
# TODO: Move it to the top once SDK released with fileStorage Feature
# Change the source fs and destination fs type to to FileStorage
from unstract.sdk.utils import FileStorageUtils

FileStorageUtils.copy_file_to_destination(
gaya3-zipstack marked this conversation as resolved.
Show resolved Hide resolved
source_storage=source_fs,
destination_storage=destination_fs,
source_path=source_path,
destination_paths=[destination_path],
)
except Exception as e:
self.stream_error_and_exit(f"Error copying file: {e}")

def extract_text(
self, file: str, text_extraction_adapter_id: Optional[str]
) -> Optional[str]:
Expand Down Expand Up @@ -127,8 +163,13 @@ def _extract_from_file(self, file: str) -> Optional[str]:
"""
self.tool.stream_log("Extracting text from file")
try:
with open(file, "rb") as f:
text = f.read().decode("utf-8")
if hasattr(self.tool, "workflow_filestorage"):
text = self.tool.workflow_filestorage.read(path=file, mode="rb").decode(
"utf-8"
)
else:
with open(file, "rb") as f:
text = f.read().decode("utf-8")
except Exception as e:
self.tool.stream_log(f"File error: {e}")
return None
Expand Down
4 changes: 4 additions & 0 deletions tools/structure/sample.env
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,7 @@ PROMPT_PORT=3003

X2TEXT_HOST=http://unstract-x2text-service
X2TEXT_PORT=3004

EXECUTION_DATA_DIR=<execution_dir_path_with_bucket>
WORKFLOW_EXECUTION_FS_PROVIDER="minio"
WORKFLOW_EXECUTION_FS_CREDENTIAL={"endpoint_url":"http://localhost:9000","key":"","secret":""}
27 changes: 23 additions & 4 deletions tools/structure/src/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ def run(
f"Function to higlight context is not found. {PAID_FEATURE_MSG}",
level=LogLevel.WARN,
)

workflow_filestorage = getattr(self, "workflow_filestorage", None)
if tool_settings[SettingsKeys.ENABLE_SINGLE_PASS_EXTRACTION]:
index.index(
tool_id=tool_id,
Expand All @@ -124,6 +124,11 @@ def run(
reindex=True,
usage_kwargs=usage_kwargs,
process_text=process_text,
**(
{"fs": workflow_filestorage}
if workflow_filestorage is not None
else {}
),
)
if summarize_as_source:
summarize_file_hash = self._summarize_and_index(
Expand Down Expand Up @@ -291,13 +296,23 @@ def _summarize_and_index(
summarize_file_path = tool_data_dir / SettingsKeys.SUMMARIZE

summarized_context = ""
if summarize_file_path.exists():
if hasattr(self, "workflow_filestorage"):
if self.workflow_filestorage.exists(summarize_file_path):
summarized_context = self.workflow_filestorage.read(
path=summarize_file_path, mode="r"
)
elif summarize_file_path.exists():
with open(summarize_file_path, encoding="utf-8") as f:
summarized_context = f.read()
if not summarized_context:
context = ""
with open(extract_file_path, encoding="utf-8") as file:
context = file.read()
if hasattr(self, "workflow_filestorage"):
context = self.workflow_filestorage.read(
path=extract_file_path, mode="r"
)
else:
with open(extract_file_path, encoding="utf-8") as file:
context = file.read()
prompt_keys = []
for output in outputs:
prompt_keys.append(output[SettingsKeys.NAME])
Expand Down Expand Up @@ -329,6 +344,7 @@ def _summarize_and_index(
summarize_file_hash: str = ToolUtils.get_hash_from_file(
file_path=summarize_file_path
)
workflow_filestorage = getattr(self, "workflow_filestorage", None)
index.index(
tool_id=tool_id,
embedding_instance_id=embedding_instance_id,
Expand All @@ -339,6 +355,9 @@ def _summarize_and_index(
chunk_size=0,
chunk_overlap=0,
usage_kwargs=usage_kwargs,
**(
{"fs": workflow_filestorage} if workflow_filestorage is not None else {}
),
)
return summarize_file_hash

Expand Down
4 changes: 4 additions & 0 deletions tools/text_extractor/sample.env
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,7 @@ TOOL_DATA_DIR=
# X2TEXT service
X2TEXT_HOST=
X2TEXT_PORT=

EXECUTION_DATA_DIR=<execution_dir_path_with_bucket>
WORKFLOW_EXECUTION_FS_PROVIDER="minio"
WORKFLOW_EXECUTION_FS_CREDENTIAL={"endpoint_url":"http://localhost:9000","key":"","secret":""}
9 changes: 7 additions & 2 deletions tools/text_extractor/src/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,13 @@ def run(
self.stream_log("Preparing to write the extracted text.")
if source_name:
output_path = Path(output_dir) / f"{Path(source_name).stem}.txt"
with open(output_path, "w", encoding="utf-8") as file:
file.write(extracted_text)
if hasattr(self, "workflow_filestorage"):
self.workflow_filestorage.write(
path=output_path, mode="w", data=extracted_text
)
else:
with open(output_path, "w", encoding="utf-8") as file:
file.write(extracted_text)

self.stream_log("Tool output written successfully.")
else:
Expand Down
Loading