Skip to content

Commit

Permalink
Removing unused objects and fixing log_exception method
Browse files Browse the repository at this point in the history
  • Loading branch information
Default2882 committed Nov 27, 2024
1 parent a87bd52 commit 6243033
Show file tree
Hide file tree
Showing 3 changed files with 28 additions and 48 deletions.
57 changes: 28 additions & 29 deletions python-sdk/indexify/executor/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,6 @@ def __init__(
self._base_url = f"{self._protocol}://{self._server_addr}"
self._task_store: TaskStore = TaskStore()
self._executor_id = executor_id
console.print("Starting FunctionWorker", style="cyan bold")
self._function_worker = FunctionWorker(
indexify_client=IndexifyClient(
service_url=self._base_url,
Expand Down Expand Up @@ -135,7 +134,7 @@ async def task_launcher(self):
task: Task = self._task_store.get_task(fn.task_id)

if self._executor_bootstrap_failed:
self.mark_task_as_failed(task)
self._mark_task_as_failed(task)
continue

# Bootstrap this executor. Fail the task if we can't.
Expand All @@ -161,7 +160,10 @@ async def task_launcher(self):
match task_name:
case TaskEnum.GET_RUNNABLE_TASK:
if async_task.exception():
self._console_log_exception(async_task)
self._console_log_exception(
"Task Launcher Error:",
f"Failed to get runnable tasks: {async_task.exception()}"
)
continue
result: Dict[str, Task] = await async_task
task: Task
Expand All @@ -180,8 +182,11 @@ async def task_launcher(self):
case TaskEnum.DOWNLOAD_GRAPH_TASK:
async_task: DownloadTask
if async_task.exception():
self._console_log_exception(async_task)
self.mark_task_as_failed(async_task.task)
self._console_log_exception(
f"Failed to download graph for task {async_task.task.id}\n",
f"Exception: {async_task.exception()}"
)
self._mark_task_as_failed(async_task.task)
continue
async_tasks.append(
self._downloader.download(
Expand All @@ -191,8 +196,11 @@ async def task_launcher(self):
case TaskEnum.DOWNLOAD_INPUT_TASK:
async_task: DownloadTask
if async_task.exception():
self._console_log_exception(async_task)
self.mark_task_as_failed(async_task.task)
self._console_log_exception(
f"Failed to download input for task {async_task.task.id}\n",
f"Exception: {async_task.exception()}"
)
self._mark_task_as_failed(async_task.task)
continue
downloaded_inputs: DownloadedInputs = await async_task
task: Task = async_task.task
Expand All @@ -209,7 +217,7 @@ async def task_launcher(self):
case TaskEnum.RUN_FUNCTION_TASK:
async_task: RunFunctionTask
if async_task.exception():
self.mark_task_as_failed(
self._mark_task_as_failed(
async_task.task, str(async_task.exception())
)
continue
Expand All @@ -234,35 +242,26 @@ async def task_launcher(self):
self._task_store.retriable_failure(async_task.task.id)
continue
except Exception as e:
console.print(
Text(
f"Failed to execute task {async_task.task.id}\n",
style="red bold",
)
+ Text(f"Exception: {e}", style="red")
self._console_log_exception(
f"Failed to execute task {async_task.task.id}\n",
f"Exception: {e}"
)
completed_task = CompletedTask(
task=async_task.task,
task_outcome="failure",
outputs=[],
self._mark_task_as_failed(
async_task.task, str(e)
)
self._task_store.complete(outcome=completed_task)
continue
case _:
raise ValueError(
f"'{async_task.get_name()}' is not a valid task name."
)

def _console_log_exception(self, async_task: asyncio.Task):
console.print(
Text("Task Launcher Error: ", style="red bold")
+ Text(
f"Failed to get runnable tasks: {async_task.exception()}",
style="red",
)
)
def _console_log_exception(self, *args: str):
errorMessage = None
for arg in args:
error_message = Text(arg) if errorMessage is None else error_message + arg
console.print(Text(error_message, style="red bold"))

def mark_task_as_failed(self, task: Task, stderr: str = None):
def _mark_task_as_failed(self, task: Task, stderr: str = None):
completed_task = CompletedTask(
task=task,
outputs=[],
Expand All @@ -288,7 +287,7 @@ def _try_bootstrap(self, task: Task) -> bool:
)

self._executor_bootstrap_failed = True
self.mark_task_as_failed(task)
self._mark_task_as_failed(task)
return False

async def run(self):
Expand Down
7 changes: 0 additions & 7 deletions python-sdk/indexify/executor/downloader.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,13 +73,6 @@ async def download_graph(self, namespace: str, name: str, version: int) -> str:
os.makedirs(os.path.dirname(path), exist_ok=True)
with open(path, "wb") as f:
f.write(response.content)
console.print(
Panel(
f"Graph Downloaded",
title="downloader",
border_style="cyan",
)
)
return path

async def download_input(self, task: Task) -> DownloadedInputs:
Expand Down
12 changes: 0 additions & 12 deletions python-sdk/indexify/executor/function_worker/function_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,18 +42,6 @@ class FunctionOutput(BaseModel):
stdout: str = ""
stderr: str = ""


class Job(BaseModel):
namespace: str
graph_name: str
fn_name: str
input: IndexifyData
code_path: str
version: int
init_value: Optional[IndexifyData] = None
invocation_id: Optional[str] = None


class FunctionWorker:
def __init__(
self,
Expand Down

0 comments on commit 6243033

Please sign in to comment.