Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

bug(sdk): increase task reporting timeout from 5s to 5m #1048

Merged
merged 1 commit into from
Nov 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 13 additions & 2 deletions python-sdk/indexify/executor/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,9 +112,15 @@ async def task_completion_reporter(self):
console.print(Text("Starting task completion reporter", style="bold cyan"))
# We should copy only the keys and not the values
url = f"{self._protocol}://{self._server_addr}/write_content"

while True:
outcomes = await self._task_store.task_outcomes()
for task_outcome in outcomes:
retryStr = (
f"\nRetries: {task_outcome.reporting_retries}"
if task_outcome.reporting_retries > 0
else ""
)
outcome = task_outcome.task_outcome
style_outcome = (
f"[bold red] {outcome} [/]"
Expand All @@ -125,7 +131,9 @@ async def task_completion_reporter(self):
Panel(
f"Reporting outcome of task: {task_outcome.task.id}, function: {task_outcome.task.compute_fn}\n"
f"Outcome: {style_outcome}\n"
f"Num Fn Outputs: {len(task_outcome.outputs or [])} Router Output: {task_outcome.router_output}",
f"Num Fn Outputs: {len(task_outcome.outputs or [])}\n"
f"Router Output: {task_outcome.router_output}\n"
f"Retries: {task_outcome.reporting_retries}",
title="Task Completion",
border_style="info",
)
Expand All @@ -139,11 +147,14 @@ async def task_completion_reporter(self):
console.print(
Panel(
f"Failed to report task {task_outcome.task.id}\n"
f"Exception: {e}\nRetrying...",
f"Exception: {type(e).__name__}({e})\n"
f"Retries: {task_outcome.reporting_retries}\n"
"Retrying...",
title="Reporting Error",
border_style="error",
)
)
task_outcome.reporting_retries += 1
await asyncio.sleep(5)
continue

Expand Down
61 changes: 49 additions & 12 deletions python-sdk/indexify/executor/task_reporter.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
from typing import Optional

import nanoid
from httpx import Timeout
from pydantic import BaseModel
from rich import print

from indexify.common_util import get_httpx_client
Expand All @@ -21,6 +23,15 @@ def __bool__(self):
UTF_8_CONTENT_TYPE = "application/octet-stream"


class ReportingData(BaseModel):
output_count: int = 0
output_total_bytes: int = 0
stdout_count: int = 0
stdout_total_bytes: int = 0
stderr_count: int = 0
stderr_total_bytes: int = 0


class TaskReporter:
def __init__(
self, base_url: str, executor_id: str, config_path: Optional[str] = None
Expand All @@ -30,11 +41,10 @@ def __init__(
self._client = get_httpx_client(config_path)

def report_task_outcome(self, completed_task: CompletedTask):

report = ReportingData()
fn_outputs = []
for output in completed_task.outputs or []:
print(
f"[bold]task-reporter[/bold] uploading output of size: {len(output.payload)} bytes"
)
serializer = get_serializer(output.encoder)
serialized_output = serializer.serialize(output.payload)
fn_outputs.append(
Expand All @@ -43,11 +53,10 @@ def report_task_outcome(self, completed_task: CompletedTask):
(nanoid.generate(), serialized_output, serializer.content_type),
)
)
report.output_count += 1
report.output_total_bytes += len(serialized_output)

if completed_task.stdout:
print(
f"[bold]task-reporter[/bold] uploading stdout of size: {len(completed_task.stdout)}"
)
fn_outputs.append(
(
"stdout",
Expand All @@ -58,11 +67,10 @@ def report_task_outcome(self, completed_task: CompletedTask):
),
)
)
report.stdout_count += 1
report.stdout_total_bytes += len(completed_task.stdout)

if completed_task.stderr:
print(
f"[bold]task-reporter[/bold] uploading stderr of size: {len(completed_task.stderr)}"
)
fn_outputs.append(
(
"stderr",
Expand All @@ -73,6 +81,8 @@ def report_task_outcome(self, completed_task: CompletedTask):
),
)
)
report.stderr_count += 1
report.stderr_total_bytes += len(completed_task.stderr)

router_output = (
ApiRouterOutput(edges=completed_task.router_output.edges)
Expand All @@ -93,7 +103,30 @@ def report_task_outcome(self, completed_task: CompletedTask):
)
task_result_data = task_result.model_dump_json(exclude_none=True)

kwargs = {"data": {"task_result": task_result_data}}
total_bytes = (
report.output_total_bytes
+ report.stdout_total_bytes
+ report.stderr_total_bytes
)

print(
f"[bold]task-reporter[/bold] reporting task outcome "
f"task_id={completed_task.task.id} retries={completed_task.reporting_retries} "
f"total_bytes={total_bytes} total_files={report.output_count + report.stdout_count + report.stderr_count} "
f"output_files={report.output_count} output_bytes={total_bytes} "
f"stdout_bytes={report.stdout_total_bytes} stderr_bytes={report.stderr_total_bytes} "
)

#
kwargs = {
"data": {"task_result": task_result_data},
# Use httpx default timeout of 5s for all timeout types.
# For read timeouts, use 5 minutes to allow for large file uploads.
"timeout": Timeout(
5.0,
read=5.0 * 60,
),
}
if fn_outputs and len(fn_outputs) > 0:
kwargs["files"] = fn_outputs
else:
Expand All @@ -104,11 +137,15 @@ def report_task_outcome(self, completed_task: CompletedTask):
**kwargs,
)
except Exception as e:
print(f"failed to report task outcome {e}")
print(
f"[bold]task-reporter[/bold] failed to report task outcome retries={completed_task.reporting_retries} {type(e).__name__}({e})"
)
raise e

try:
response.raise_for_status()
except Exception as e:
print(f"failed to report task outcome {response.text}")
print(
f"[bold]task-reporter[/bold] failed to report task outcome retries={completed_task.reporting_retries} {response.text}"
)
raise e
1 change: 1 addition & 0 deletions python-sdk/indexify/executor/task_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ class CompletedTask(BaseModel):
stdout: Optional[str] = None
stderr: Optional[str] = None
reducer: bool = False
reporting_retries: int = 0


class TaskStore:
Expand Down
2 changes: 1 addition & 1 deletion server/src/routes/internal_ingest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ pub struct InvokeWithFile {
/// Upload data to a compute graph
#[utoipa::path(
post,
path = "/namespaces/{namespace}/compute_graphs/{compute_graph}/invoke_file",
path = "internal/ingest_files",
request_body(content_type = "multipart/form-data", content = inline(InvokeWithFile)),
tag = "ingestion",
responses(
Expand Down
Loading