diff --git a/python-sdk/indexify/executor/agent.py b/python-sdk/indexify/executor/agent.py index c4bb84536..a326381ef 100644 --- a/python-sdk/indexify/executor/agent.py +++ b/python-sdk/indexify/executor/agent.py @@ -112,9 +112,15 @@ async def task_completion_reporter(self): console.print(Text("Starting task completion reporter", style="bold cyan")) # We should copy only the keys and not the values url = f"{self._protocol}://{self._server_addr}/write_content" + while True: outcomes = await self._task_store.task_outcomes() for task_outcome in outcomes: + retryStr = ( + f"\nRetries: {task_outcome.reporting_retries}" + if task_outcome.reporting_retries > 0 + else "" + ) outcome = task_outcome.task_outcome style_outcome = ( f"[bold red] {outcome} [/]" @@ -125,7 +131,9 @@ async def task_completion_reporter(self): Panel( f"Reporting outcome of task: {task_outcome.task.id}, function: {task_outcome.task.compute_fn}\n" f"Outcome: {style_outcome}\n" - f"Num Fn Outputs: {len(task_outcome.outputs or [])} Router Output: {task_outcome.router_output}", + f"Num Fn Outputs: {len(task_outcome.outputs or [])}\n" + f"Router Output: {task_outcome.router_output}\n" + f"Retries: {task_outcome.reporting_retries}", title="Task Completion", border_style="info", ) @@ -139,11 +147,14 @@ async def task_completion_reporter(self): console.print( Panel( f"Failed to report task {task_outcome.task.id}\n" - f"Exception: {e}\nRetrying...", + f"Exception: {type(e).__name__}({e})\n" + f"Retries: {task_outcome.reporting_retries}\n" + "Retrying...", title="Reporting Error", border_style="error", ) ) + task_outcome.reporting_retries += 1 await asyncio.sleep(5) continue diff --git a/python-sdk/indexify/executor/task_reporter.py b/python-sdk/indexify/executor/task_reporter.py index 55766d1d7..c905eb94e 100644 --- a/python-sdk/indexify/executor/task_reporter.py +++ b/python-sdk/indexify/executor/task_reporter.py @@ -2,6 +2,8 @@ from typing import Optional import nanoid +from httpx import Timeout +from pydantic import BaseModel from rich import print from indexify.common_util import get_httpx_client @@ -21,6 +23,15 @@ def __bool__(self): UTF_8_CONTENT_TYPE = "application/octet-stream" +class ReportingData(BaseModel): + output_count: int = 0 + output_total_bytes: int = 0 + stdout_count: int = 0 + stdout_total_bytes: int = 0 + stderr_count: int = 0 + stderr_total_bytes: int = 0 + + class TaskReporter: def __init__( self, base_url: str, executor_id: str, config_path: Optional[str] = None @@ -30,11 +41,10 @@ def __init__( self._client = get_httpx_client(config_path) def report_task_outcome(self, completed_task: CompletedTask): + + report = ReportingData() fn_outputs = [] for output in completed_task.outputs or []: - print( - f"[bold]task-reporter[/bold] uploading output of size: {len(output.payload)} bytes" - ) serializer = get_serializer(output.encoder) serialized_output = serializer.serialize(output.payload) fn_outputs.append( @@ -43,11 +53,10 @@ def report_task_outcome(self, completed_task: CompletedTask): (nanoid.generate(), serialized_output, serializer.content_type), ) ) + report.output_count += 1 + report.output_total_bytes += len(serialized_output) if completed_task.stdout: - print( - f"[bold]task-reporter[/bold] uploading stdout of size: {len(completed_task.stdout)}" - ) fn_outputs.append( ( "stdout", @@ -58,11 +67,10 @@ def report_task_outcome(self, completed_task: CompletedTask): ), ) ) + report.stdout_count += 1 + report.stdout_total_bytes += len(completed_task.stdout) if completed_task.stderr: - print( - f"[bold]task-reporter[/bold] uploading stderr of size: {len(completed_task.stderr)}" - ) fn_outputs.append( ( "stderr", @@ -73,6 +81,8 @@ def report_task_outcome(self, completed_task: CompletedTask): ), ) ) + report.stderr_count += 1 + report.stderr_total_bytes += len(completed_task.stderr) router_output = ( ApiRouterOutput(edges=completed_task.router_output.edges) @@ -93,7 +103,30 @@ def report_task_outcome(self, completed_task: CompletedTask): ) task_result_data = task_result.model_dump_json(exclude_none=True) - kwargs = {"data": {"task_result": task_result_data}} + total_bytes = ( + report.output_total_bytes + + report.stdout_total_bytes + + report.stderr_total_bytes + ) + + print( + f"[bold]task-reporter[/bold] reporting task outcome " + f"task_id={completed_task.task.id} retries={completed_task.reporting_retries} " + f"total_bytes={total_bytes} total_files={report.output_count + report.stdout_count + report.stderr_count} " + f"output_files={report.output_count} output_bytes={total_bytes} " + f"stdout_bytes={report.stdout_total_bytes} stderr_bytes={report.stderr_total_bytes} " + ) + + # + kwargs = { + "data": {"task_result": task_result_data}, + # Use httpx default timeout of 5s for all timeout types. + # For read timeouts, use 5 minutes to allow for large file uploads. + "timeout": Timeout( + 5.0, + read=5.0 * 60, + ), + } if fn_outputs and len(fn_outputs) > 0: kwargs["files"] = fn_outputs else: @@ -104,11 +137,15 @@ def report_task_outcome(self, completed_task: CompletedTask): **kwargs, ) except Exception as e: - print(f"failed to report task outcome {e}") + print( + f"[bold]task-reporter[/bold] failed to report task outcome retries={completed_task.reporting_retries} {type(e).__name__}({e})" + ) raise e try: response.raise_for_status() except Exception as e: - print(f"failed to report task outcome {response.text}") + print( + f"[bold]task-reporter[/bold] failed to report task outcome retries={completed_task.reporting_retries} {response.text}" + ) raise e diff --git a/python-sdk/indexify/executor/task_store.py b/python-sdk/indexify/executor/task_store.py index c280730e4..bd97ad9f9 100644 --- a/python-sdk/indexify/executor/task_store.py +++ b/python-sdk/indexify/executor/task_store.py @@ -17,6 +17,7 @@ class CompletedTask(BaseModel): stdout: Optional[str] = None stderr: Optional[str] = None reducer: bool = False + reporting_retries: int = 0 class TaskStore: diff --git a/server/src/routes/internal_ingest.rs b/server/src/routes/internal_ingest.rs index a08e97406..723d80bab 100644 --- a/server/src/routes/internal_ingest.rs +++ b/server/src/routes/internal_ingest.rs @@ -81,7 +81,7 @@ pub struct InvokeWithFile { /// Upload data to a compute graph #[utoipa::path( post, - path = "/namespaces/{namespace}/compute_graphs/{compute_graph}/invoke_file", + path = "internal/ingest_files", request_body(content_type = "multipart/form-data", content = inline(InvokeWithFile)), tag = "ingestion", responses(