Skip to content

Commit

Permalink
handled never ending etl issue in MRQ when highlight is not selected (#…
Browse files Browse the repository at this point in the history
…570)

* handled never ending etl issue in MRQ when highlight is not selected

* Update backend/workflow_manager/endpoint/destination.py

Co-authored-by: Ritwik G <[email protected]>
Signed-off-by: vishnuszipstack <[email protected]>

---------

Signed-off-by: vishnuszipstack <[email protected]>
Co-authored-by: Ritwik G <[email protected]>
  • Loading branch information
vishnuszipstack and ritwik-g authored Aug 9, 2024
1 parent fcc771e commit 42ba8e0
Show file tree
Hide file tree
Showing 2 changed files with 8 additions and 3 deletions.
7 changes: 6 additions & 1 deletion backend/workflow_manager/endpoint/destination.py
Original file line number Diff line number Diff line change
Expand Up @@ -528,17 +528,22 @@ def _push_to_queue(
settings=connector_settings, connector_id=connector.connector_id
)
with source_fs.open(input_file_path, "rb") as remote_file:
whisper_hash = None
file_content = remote_file.read()
# Convert file content to a base64 encoded string
file_content_base64 = base64.b64encode(file_content).decode("utf-8")
q_name = f"review_queue_{self.organization_id}_{workflow.id}"
if meta_data:
whisper_hash = meta_data.get("whisper-hash")
else:
whisper_hash = None
queue_result = QueueResult(
file=file_name,
whisper_hash=meta_data["whisper-hash"],
status=QueueResultStatus.SUCCESS,
result=result,
workflow_id=str(self.workflow_id),
file_content=file_content_base64,
whisper_hash=whisper_hash,
).to_dict()
# Convert the result dictionary to a JSON string
queue_result_json = json.dumps(queue_result)
Expand Down
4 changes: 2 additions & 2 deletions backend/workflow_manager/endpoint/queue_utils.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import logging
from dataclasses import dataclass
from enum import Enum
from typing import Any
from typing import Any, Optional

from utils.constants import Common
from workflow_manager.endpoint.exceptions import UnstractQueueException
Expand Down Expand Up @@ -34,11 +34,11 @@ def get_queue_inst(connector_settings: dict[str, Any] = {}) -> UnstractQueue:
@dataclass
class QueueResult:
file: str
whisper_hash: str
status: QueueResultStatus
result: Any
workflow_id: str
file_content: str
whisper_hash: Optional[str] = None

def to_dict(self) -> Any:
return {
Expand Down

0 comments on commit 42ba8e0

Please sign in to comment.