Skip to content

Commit

Permalink
Merge pull request #1048 from tensorlakeai/seriousben/increase-task-r…
Browse files Browse the repository at this point in the history
…eporter-timeout

bug(sdk): increase task reporting timeout from 5s to 5m
  • Loading branch information
seriousben authored Nov 20, 2024
2 parents 907b98a + 5b190b2 commit 1c4fb10
Show file tree
Hide file tree
Showing 4 changed files with 64 additions and 15 deletions.
15 changes: 13 additions & 2 deletions python-sdk/indexify/executor/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,9 +112,15 @@ async def task_completion_reporter(self):
console.print(Text("Starting task completion reporter", style="bold cyan"))
# We should copy only the keys and not the values
url = f"{self._protocol}://{self._server_addr}/write_content"

while True:
outcomes = await self._task_store.task_outcomes()
for task_outcome in outcomes:
retryStr = (
f"\nRetries: {task_outcome.reporting_retries}"
if task_outcome.reporting_retries > 0
else ""
)
outcome = task_outcome.task_outcome
style_outcome = (
f"[bold red] {outcome} [/]"
Expand All @@ -125,7 +131,9 @@ async def task_completion_reporter(self):
Panel(
f"Reporting outcome of task: {task_outcome.task.id}, function: {task_outcome.task.compute_fn}\n"
f"Outcome: {style_outcome}\n"
f"Num Fn Outputs: {len(task_outcome.outputs or [])} Router Output: {task_outcome.router_output}",
f"Num Fn Outputs: {len(task_outcome.outputs or [])}\n"
f"Router Output: {task_outcome.router_output}\n"
f"Retries: {task_outcome.reporting_retries}",
title="Task Completion",
border_style="info",
)
Expand All @@ -139,11 +147,14 @@ async def task_completion_reporter(self):
console.print(
Panel(
f"Failed to report task {task_outcome.task.id}\n"
f"Exception: {e}\nRetrying...",
f"Exception: {type(e).__name__}({e})\n"
f"Retries: {task_outcome.reporting_retries}\n"
"Retrying...",
title="Reporting Error",
border_style="error",
)
)
task_outcome.reporting_retries += 1
await asyncio.sleep(5)
continue

Expand Down
61 changes: 49 additions & 12 deletions python-sdk/indexify/executor/task_reporter.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
from typing import Optional

import nanoid
from httpx import Timeout
from pydantic import BaseModel
from rich import print

from indexify.common_util import get_httpx_client
Expand All @@ -21,6 +23,15 @@ def __bool__(self):
UTF_8_CONTENT_TYPE = "application/octet-stream"


class ReportingData(BaseModel):
output_count: int = 0
output_total_bytes: int = 0
stdout_count: int = 0
stdout_total_bytes: int = 0
stderr_count: int = 0
stderr_total_bytes: int = 0


class TaskReporter:
def __init__(
self, base_url: str, executor_id: str, config_path: Optional[str] = None
Expand All @@ -30,11 +41,10 @@ def __init__(
self._client = get_httpx_client(config_path)

def report_task_outcome(self, completed_task: CompletedTask):

report = ReportingData()
fn_outputs = []
for output in completed_task.outputs or []:
print(
f"[bold]task-reporter[/bold] uploading output of size: {len(output.payload)} bytes"
)
serializer = get_serializer(output.encoder)
serialized_output = serializer.serialize(output.payload)
fn_outputs.append(
Expand All @@ -43,11 +53,10 @@ def report_task_outcome(self, completed_task: CompletedTask):
(nanoid.generate(), serialized_output, serializer.content_type),
)
)
report.output_count += 1
report.output_total_bytes += len(serialized_output)

if completed_task.stdout:
print(
f"[bold]task-reporter[/bold] uploading stdout of size: {len(completed_task.stdout)}"
)
fn_outputs.append(
(
"stdout",
Expand All @@ -58,11 +67,10 @@ def report_task_outcome(self, completed_task: CompletedTask):
),
)
)
report.stdout_count += 1
report.stdout_total_bytes += len(completed_task.stdout)

if completed_task.stderr:
print(
f"[bold]task-reporter[/bold] uploading stderr of size: {len(completed_task.stderr)}"
)
fn_outputs.append(
(
"stderr",
Expand All @@ -73,6 +81,8 @@ def report_task_outcome(self, completed_task: CompletedTask):
),
)
)
report.stderr_count += 1
report.stderr_total_bytes += len(completed_task.stderr)

router_output = (
ApiRouterOutput(edges=completed_task.router_output.edges)
Expand All @@ -93,7 +103,30 @@ def report_task_outcome(self, completed_task: CompletedTask):
)
task_result_data = task_result.model_dump_json(exclude_none=True)

kwargs = {"data": {"task_result": task_result_data}}
total_bytes = (
report.output_total_bytes
+ report.stdout_total_bytes
+ report.stderr_total_bytes
)

print(
f"[bold]task-reporter[/bold] reporting task outcome "
f"task_id={completed_task.task.id} retries={completed_task.reporting_retries} "
f"total_bytes={total_bytes} total_files={report.output_count + report.stdout_count + report.stderr_count} "
f"output_files={report.output_count} output_bytes={total_bytes} "
f"stdout_bytes={report.stdout_total_bytes} stderr_bytes={report.stderr_total_bytes} "
)

#
kwargs = {
"data": {"task_result": task_result_data},
# Use httpx default timeout of 5s for all timeout types.
# For read timeouts, use 5 minutes to allow for large file uploads.
"timeout": Timeout(
5.0,
read=5.0 * 60,
),
}
if fn_outputs and len(fn_outputs) > 0:
kwargs["files"] = fn_outputs
else:
Expand All @@ -104,11 +137,15 @@ def report_task_outcome(self, completed_task: CompletedTask):
**kwargs,
)
except Exception as e:
print(f"failed to report task outcome {e}")
print(
f"[bold]task-reporter[/bold] failed to report task outcome retries={completed_task.reporting_retries} {type(e).__name__}({e})"
)
raise e

try:
response.raise_for_status()
except Exception as e:
print(f"failed to report task outcome {response.text}")
print(
f"[bold]task-reporter[/bold] failed to report task outcome retries={completed_task.reporting_retries} {response.text}"
)
raise e
1 change: 1 addition & 0 deletions python-sdk/indexify/executor/task_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ class CompletedTask(BaseModel):
stdout: Optional[str] = None
stderr: Optional[str] = None
reducer: bool = False
reporting_retries: int = 0


class TaskStore:
Expand Down
2 changes: 1 addition & 1 deletion server/src/routes/internal_ingest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ pub struct InvokeWithFile {
/// Upload data to a compute graph
#[utoipa::path(
post,
path = "/namespaces/{namespace}/compute_graphs/{compute_graph}/invoke_file",
path = "internal/ingest_files",
request_body(content_type = "multipart/form-data", content = inline(InvokeWithFile)),
tag = "ingestion",
responses(
Expand Down

0 comments on commit 1c4fb10

Please sign in to comment.