From fecd1bb269c82c2ff94d45b56340dd291ae5d9ba Mon Sep 17 00:00:00 2001 From: Default28 Date: Wed, 20 Nov 2024 12:13:38 +0530 Subject: [PATCH 01/25] [WIP] refactored function worker into a separate folder --- .../executor/function_worker/__init__.py | 5 + .../{ => function_worker}/function_worker.py | 129 +++++++++++++----- .../function_worker/function_worker_utils.py | 17 +++ 3 files changed, 114 insertions(+), 37 deletions(-) create mode 100644 python-sdk/indexify/executor/function_worker/__init__.py rename python-sdk/indexify/executor/{ => function_worker}/function_worker.py (64%) create mode 100644 python-sdk/indexify/executor/function_worker/function_worker_utils.py diff --git a/python-sdk/indexify/executor/function_worker/__init__.py b/python-sdk/indexify/executor/function_worker/__init__.py new file mode 100644 index 000000000..bf0d66e3d --- /dev/null +++ b/python-sdk/indexify/executor/function_worker/__init__.py @@ -0,0 +1,5 @@ +from .function_worker import FunctionWorker + +__all__ = [ + "FunctionWorker" +] diff --git a/python-sdk/indexify/executor/function_worker.py b/python-sdk/indexify/executor/function_worker/function_worker.py similarity index 64% rename from python-sdk/indexify/executor/function_worker.py rename to python-sdk/indexify/executor/function_worker/function_worker.py index 3ea594a8e..ea88029cb 100644 --- a/python-sdk/indexify/executor/function_worker.py +++ b/python-sdk/indexify/executor/function_worker/function_worker.py @@ -1,12 +1,18 @@ import sys import traceback +from asyncio import Future +from collections import deque from typing import Dict, List, Optional +import multiprocessing as mp import cloudpickle from pydantic import BaseModel +from pydantic.dataclasses import dataclass from rich import print from indexify import IndexifyClient +from indexify.executor.function_worker.function_worker_utils import \ + get_optimal_process_count from indexify.functions_sdk.data_objects import ( FunctionWorkerOutput, IndexifyData, @@ -75,60 +81,109 @@ def _load_function( ) function_wrapper_map[key] = function_wrapper +@dataclass +class Job(BaseModel): + namespace: str + graph_name: str + fn_name: str + input: IndexifyData + code_path: str + version: int + init_value: Optional[IndexifyData] = None + invocation_id: Optional[str] = None class FunctionWorker: def __init__( - self, workers: int = 1, indexify_client: IndexifyClient = None + self, + workers: int = get_optimal_process_count(), + pool_size: int = 1000, + indexify_client: IndexifyClient = None ) -> None: - self._executor: concurrent.futures.ProcessPoolExecutor = ( - concurrent.futures.ProcessPoolExecutor(max_workers=workers) - ) - self._workers = workers - self._indexify_client = indexify_client + self._workers: int = workers + self._indexify_client: IndexifyClient = indexify_client + self.job_queue: mp.Queue[(Future, Job)] = mp.Queue(maxsize=pool_size) + self.shutdown_event = mp.Event() + self.running_processes: list[mp.Process] = [] + self.finished_jobs = mp.Queue() + self._run() - async def async_submit( - self, - namespace: str, - graph_name: str, - fn_name: str, - input: IndexifyData, - code_path: str, - version: int, - init_value: Optional[IndexifyData] = None, - invocation_id: Optional[str] = None, - ) -> FunctionWorkerOutput: + def _run(self): + while not self.shutdown_event.is_set(): + if not self.job_queue.empty(): + if len(self.running_processes) < self._workers: + future, job = self.job_queue.get() + process = mp.Process( + target=self._run_process, + args=(future, job) + ) + process.start() + self.running_processes.append(process) + else: + # pass and wait for some processes to finish. + continue + + def _run_process(self, future: Future, job: Job): try: result = _run_function( - namespace, - graph_name, - fn_name, - input, - code_path, - version, - init_value, - invocation_id, + job.namespace, + job.graph_name, + job.fn_name, + job.input, + job.code_path, + job.version, + job.init_value, + job.invocation_id, self._indexify_client, ) - # TODO - bring back running in a separate process + future.set_result(FunctionWorkerOutput( + fn_outputs=result.fn_outputs, + router_output=result.router_output, + stdout=result.stdout, + stderr=result.stderr, + reducer=result.reducer, + success=result.success, + )) except Exception as e: - return FunctionWorkerOutput( + future.set_result(FunctionWorkerOutput( stdout=e.stdout, stderr=e.stderr, reducer=e.is_reducer, success=False, - ) + )) - return FunctionWorkerOutput( - fn_outputs=result.fn_outputs, - router_output=result.router_output, - stdout=result.stdout, - stderr=result.stderr, - reducer=result.reducer, - success=result.success, - ) + async def async_submit( + self, + namespace: str, + graph_name: str, + fn_name: str, + input: IndexifyData, + code_path: str, + version: int, + init_value: Optional[IndexifyData] = None, + invocation_id: Optional[str] = None, + ) -> Future: + completion_future = Future() + self.job_queue.put((completion_future, Job( + namespace=namespace, + graph_name=graph_name, + fn_name=fn_name, + input=input, + code_path=code_path, + version=version, + init_value=init_value, + invocation_id=invocation_id, + indexify_client=self._indexify_client, + ))) + return completion_future def shutdown(self): - self._executor.shutdown(wait=True, cancel_futures=True) + self.shutdown_event.set() + # kill the processes when we receive shutdown signal + print( + f"[bold] function_worker: [/bold] Shutdown signal received, killing {len(self.running_processes)} processes" + ) + for process in self.running_processes: + process.kill() def _run_function( diff --git a/python-sdk/indexify/executor/function_worker/function_worker_utils.py b/python-sdk/indexify/executor/function_worker/function_worker_utils.py new file mode 100644 index 000000000..ea8bb9216 --- /dev/null +++ b/python-sdk/indexify/executor/function_worker/function_worker_utils.py @@ -0,0 +1,17 @@ +import os + +import nanoid + + +def get_optimal_process_count(): + """ + Returns a reasonable number of processes based on CPU cores. + Generally CPU cores - 1 to leave one core for the OS/other tasks. + """ + return max(os.cpu_count() - 1, 1) + +def job_generator() -> str: + """ + Generates job ID + """ + return nanoid.generate() From a5240736c950893267810f118dd02232da7287dd Mon Sep 17 00:00:00 2001 From: Default28 Date: Wed, 20 Nov 2024 12:14:21 +0530 Subject: [PATCH 02/25] Removed unused imports from cli.py --- python-sdk/indexify/cli.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/python-sdk/indexify/cli.py b/python-sdk/indexify/cli.py index 2e085c88d..d9532df67 100644 --- a/python-sdk/indexify/cli.py +++ b/python-sdk/indexify/cli.py @@ -16,7 +16,6 @@ from rich.theme import Theme from indexify.executor.agent import ExtractorAgent -from indexify.executor.function_worker import FunctionWorker from indexify.functions_sdk.image import ( DEFAULT_IMAGE_3_10, DEFAULT_IMAGE_3_11, @@ -250,7 +249,6 @@ def _build_image(image: Image, python_sdk_path: Optional[str] = None): docker_file += "\n".join(run_strs) print(os.getcwd()) - import docker import docker.api.build docker.api.build.process_dockerfile = lambda dockerfile, path: ( From 00833ad51e28857ee55c33b307f2fcdc4334bec7 Mon Sep 17 00:00:00 2001 From: Default28 Date: Wed, 20 Nov 2024 12:15:11 +0530 Subject: [PATCH 03/25] [WIP] using multiprocessing module to create processes for function worker --- python-sdk/indexify/executor/agent.py | 24 +++++++++++++------ .../indexify/executor/executor_tasks.py | 2 +- .../indexify/executor/indexify_executor.py | 4 ++-- 3 files changed, 20 insertions(+), 10 deletions(-) diff --git a/python-sdk/indexify/executor/agent.py b/python-sdk/indexify/executor/agent.py index 8b8083460..eeeaf0be9 100644 --- a/python-sdk/indexify/executor/agent.py +++ b/python-sdk/indexify/executor/agent.py @@ -26,7 +26,7 @@ from .api_objects import ExecutorMetadata, Task from .downloader import DownloadedInputs, Downloader from .executor_tasks import DownloadGraphTask, DownloadInputTask, ExtractTask -from .function_worker import FunctionWorker +from indexify.executor.function_worker import FunctionWorker from .runtime_probes import ProbeInfo, RuntimeProbes from .task_reporter import TaskReporter from .task_store import CompletedTask, TaskStore @@ -163,7 +163,7 @@ async def task_completion_reporter(self): self._task_store.mark_reported(task_id=task_outcome.task.id) async def task_launcher(self): - async_tasks: List[asyncio.Task] = [] + async_tasks: List[asyncio.Task | asyncio.Future] = [] fn_queue: List[FunctionInput] = [] async_tasks.append( @@ -215,13 +215,23 @@ async def task_launcher(self): continue async_tasks.append( - ExtractTask( - function_worker=self._function_worker, - task=task, - input=fn.input, - code_path=f"{self._code_path}/{task.namespace}/{task.compute_graph}.{task.graph_version}", + self._function_worker.async_submit( + namespace=task.namespace, + graph_name=task.compute_graph, + fn_name=task.compute_fn, + input=input, init_value=fn.init_value, + code_path=f"{self._code_path}/{task.namespace}/{task.compute_graph}.{task.graph_version}", + version=task.graph_version, + invocation_id=task.invocation_id, ) + # ExtractTask( + # function_worker=self._function_worker, + # task=task, + # input=fn.input, + # code_path=f"{self._code_path}/{task.namespace}/{task.compute_graph}.{task.graph_version}", + # init_value=fn.init_value, + # ) ) fn_queue = [] diff --git a/python-sdk/indexify/executor/executor_tasks.py b/python-sdk/indexify/executor/executor_tasks.py index e4477a169..e89793f5b 100644 --- a/python-sdk/indexify/executor/executor_tasks.py +++ b/python-sdk/indexify/executor/executor_tasks.py @@ -5,7 +5,7 @@ from .api_objects import Task from .downloader import Downloader -from .function_worker import FunctionWorker +from indexify.executor.function_worker import FunctionWorker class DownloadGraphTask(asyncio.Task): diff --git a/python-sdk/indexify/executor/indexify_executor.py b/python-sdk/indexify/executor/indexify_executor.py index 087c1096d..1305a91f8 100644 --- a/python-sdk/indexify/executor/indexify_executor.py +++ b/python-sdk/indexify/executor/indexify_executor.py @@ -1,10 +1,10 @@ import asyncio -from typing import List, Optional +from typing import Optional import nanoid from .agent import ExtractorAgent -from .function_worker import FunctionWorker +from indexify.executor.function_worker.function_worker import FunctionWorker def join( From 57a3beca6aae4d54c4f2b7d79092d061095ce2b9 Mon Sep 17 00:00:00 2001 From: Default28 Date: Wed, 20 Nov 2024 12:16:11 +0530 Subject: [PATCH 04/25] Added server/indexify_server_state in .gitignore --- .gitignore | 1 + 1 file changed, 1 insertion(+) diff --git a/.gitignore b/.gitignore index 87cfd442e..8c1997914 100644 --- a/.gitignore +++ b/.gitignore @@ -27,6 +27,7 @@ charts/*.tgz indexify_storage indexify_local_runner_cache server/indexify_storage +server/indexify_server_state local_cache .dev-tls src/state/store/snapshots/* From 8b54182668a17062ccff5c509ab6b6f728e34700 Mon Sep 17 00:00:00 2001 From: Default28 Date: Wed, 20 Nov 2024 12:24:08 +0530 Subject: [PATCH 05/25] Removing unused imports --- .../executor/function_worker/function_worker.py | 10 +--------- 1 file changed, 1 insertion(+), 9 deletions(-) diff --git a/python-sdk/indexify/executor/function_worker/function_worker.py b/python-sdk/indexify/executor/function_worker/function_worker.py index ea88029cb..22d49672e 100644 --- a/python-sdk/indexify/executor/function_worker/function_worker.py +++ b/python-sdk/indexify/executor/function_worker/function_worker.py @@ -1,13 +1,11 @@ import sys import traceback from asyncio import Future -from collections import deque from typing import Dict, List, Optional import multiprocessing as mp import cloudpickle from pydantic import BaseModel -from pydantic.dataclasses import dataclass from rich import print from indexify import IndexifyClient @@ -21,17 +19,12 @@ from indexify.functions_sdk.indexify_functions import ( FunctionCallResult, GraphInvocationContext, - IndexifyFunction, IndexifyFunctionWrapper, - IndexifyRouter, RouterCallResult, ) function_wrapper_map: Dict[str, IndexifyFunctionWrapper] = {} -import concurrent.futures - - class FunctionRunException(Exception): def __init__( self, exception: Exception, stdout: str, stderr: str, is_reducer: bool @@ -81,7 +74,6 @@ def _load_function( ) function_wrapper_map[key] = function_wrapper -@dataclass class Job(BaseModel): namespace: str graph_name: str @@ -101,7 +93,7 @@ def __init__( ) -> None: self._workers: int = workers self._indexify_client: IndexifyClient = indexify_client - self.job_queue: mp.Queue[(Future, Job)] = mp.Queue(maxsize=pool_size) + self.job_queue: mp.Queue[tuple[Future, Job]] = mp.Queue(maxsize=pool_size) self.shutdown_event = mp.Event() self.running_processes: list[mp.Process] = [] self.finished_jobs = mp.Queue() From c66c59c3292811106d62f7de6546e5bf188dceb5 Mon Sep 17 00:00:00 2001 From: Default28 Date: Wed, 20 Nov 2024 12:25:20 +0530 Subject: [PATCH 06/25] Ran make fmt --- python-sdk/indexify/executor/agent.py | 2 +- .../indexify/executor/executor_tasks.py | 2 +- .../executor/function_worker/__init__.py | 4 +- .../function_worker/function_worker.py | 76 +++++++++++-------- .../function_worker/function_worker_utils.py | 1 + .../indexify/executor/indexify_executor.py | 3 +- 6 files changed, 49 insertions(+), 39 deletions(-) diff --git a/python-sdk/indexify/executor/agent.py b/python-sdk/indexify/executor/agent.py index eeeaf0be9..e19051126 100644 --- a/python-sdk/indexify/executor/agent.py +++ b/python-sdk/indexify/executor/agent.py @@ -14,6 +14,7 @@ from rich.theme import Theme from indexify.common_util import get_httpx_client +from indexify.executor.function_worker import FunctionWorker from indexify.functions_sdk.data_objects import ( FunctionWorkerOutput, IndexifyData, @@ -26,7 +27,6 @@ from .api_objects import ExecutorMetadata, Task from .downloader import DownloadedInputs, Downloader from .executor_tasks import DownloadGraphTask, DownloadInputTask, ExtractTask -from indexify.executor.function_worker import FunctionWorker from .runtime_probes import ProbeInfo, RuntimeProbes from .task_reporter import TaskReporter from .task_store import CompletedTask, TaskStore diff --git a/python-sdk/indexify/executor/executor_tasks.py b/python-sdk/indexify/executor/executor_tasks.py index e89793f5b..3d59cf0b7 100644 --- a/python-sdk/indexify/executor/executor_tasks.py +++ b/python-sdk/indexify/executor/executor_tasks.py @@ -1,11 +1,11 @@ import asyncio from typing import Optional +from indexify.executor.function_worker import FunctionWorker from indexify.functions_sdk.data_objects import IndexifyData from .api_objects import Task from .downloader import Downloader -from indexify.executor.function_worker import FunctionWorker class DownloadGraphTask(asyncio.Task): diff --git a/python-sdk/indexify/executor/function_worker/__init__.py b/python-sdk/indexify/executor/function_worker/__init__.py index bf0d66e3d..1849fdc64 100644 --- a/python-sdk/indexify/executor/function_worker/__init__.py +++ b/python-sdk/indexify/executor/function_worker/__init__.py @@ -1,5 +1,3 @@ from .function_worker import FunctionWorker -__all__ = [ - "FunctionWorker" -] +__all__ = ["FunctionWorker"] diff --git a/python-sdk/indexify/executor/function_worker/function_worker.py b/python-sdk/indexify/executor/function_worker/function_worker.py index 22d49672e..25b5bbd3b 100644 --- a/python-sdk/indexify/executor/function_worker/function_worker.py +++ b/python-sdk/indexify/executor/function_worker/function_worker.py @@ -1,16 +1,17 @@ +import multiprocessing as mp import sys import traceback from asyncio import Future from typing import Dict, List, Optional -import multiprocessing as mp import cloudpickle from pydantic import BaseModel from rich import print from indexify import IndexifyClient -from indexify.executor.function_worker.function_worker_utils import \ - get_optimal_process_count +from indexify.executor.function_worker.function_worker_utils import ( + get_optimal_process_count, +) from indexify.functions_sdk.data_objects import ( FunctionWorkerOutput, IndexifyData, @@ -25,6 +26,7 @@ function_wrapper_map: Dict[str, IndexifyFunctionWrapper] = {} + class FunctionRunException(Exception): def __init__( self, exception: Exception, stdout: str, stderr: str, is_reducer: bool @@ -74,6 +76,7 @@ def _load_function( ) function_wrapper_map[key] = function_wrapper + class Job(BaseModel): namespace: str graph_name: str @@ -84,12 +87,13 @@ class Job(BaseModel): init_value: Optional[IndexifyData] = None invocation_id: Optional[str] = None + class FunctionWorker: def __init__( self, workers: int = get_optimal_process_count(), pool_size: int = 1000, - indexify_client: IndexifyClient = None + indexify_client: IndexifyClient = None, ) -> None: self._workers: int = workers self._indexify_client: IndexifyClient = indexify_client @@ -104,10 +108,7 @@ def _run(self): if not self.job_queue.empty(): if len(self.running_processes) < self._workers: future, job = self.job_queue.get() - process = mp.Process( - target=self._run_process, - args=(future, job) - ) + process = mp.Process(target=self._run_process, args=(future, job)) process.start() self.running_processes.append(process) else: @@ -127,21 +128,25 @@ def _run_process(self, future: Future, job: Job): job.invocation_id, self._indexify_client, ) - future.set_result(FunctionWorkerOutput( - fn_outputs=result.fn_outputs, - router_output=result.router_output, - stdout=result.stdout, - stderr=result.stderr, - reducer=result.reducer, - success=result.success, - )) + future.set_result( + FunctionWorkerOutput( + fn_outputs=result.fn_outputs, + router_output=result.router_output, + stdout=result.stdout, + stderr=result.stderr, + reducer=result.reducer, + success=result.success, + ) + ) except Exception as e: - future.set_result(FunctionWorkerOutput( - stdout=e.stdout, - stderr=e.stderr, - reducer=e.is_reducer, - success=False, - )) + future.set_result( + FunctionWorkerOutput( + stdout=e.stdout, + stderr=e.stderr, + reducer=e.is_reducer, + success=False, + ) + ) async def async_submit( self, @@ -155,17 +160,22 @@ async def async_submit( invocation_id: Optional[str] = None, ) -> Future: completion_future = Future() - self.job_queue.put((completion_future, Job( - namespace=namespace, - graph_name=graph_name, - fn_name=fn_name, - input=input, - code_path=code_path, - version=version, - init_value=init_value, - invocation_id=invocation_id, - indexify_client=self._indexify_client, - ))) + self.job_queue.put( + ( + completion_future, + Job( + namespace=namespace, + graph_name=graph_name, + fn_name=fn_name, + input=input, + code_path=code_path, + version=version, + init_value=init_value, + invocation_id=invocation_id, + indexify_client=self._indexify_client, + ), + ) + ) return completion_future def shutdown(self): diff --git a/python-sdk/indexify/executor/function_worker/function_worker_utils.py b/python-sdk/indexify/executor/function_worker/function_worker_utils.py index ea8bb9216..e244caeaf 100644 --- a/python-sdk/indexify/executor/function_worker/function_worker_utils.py +++ b/python-sdk/indexify/executor/function_worker/function_worker_utils.py @@ -10,6 +10,7 @@ def get_optimal_process_count(): """ return max(os.cpu_count() - 1, 1) + def job_generator() -> str: """ Generates job ID diff --git a/python-sdk/indexify/executor/indexify_executor.py b/python-sdk/indexify/executor/indexify_executor.py index 1305a91f8..f843b205f 100644 --- a/python-sdk/indexify/executor/indexify_executor.py +++ b/python-sdk/indexify/executor/indexify_executor.py @@ -3,9 +3,10 @@ import nanoid -from .agent import ExtractorAgent from indexify.executor.function_worker.function_worker import FunctionWorker +from .agent import ExtractorAgent + def join( workers: int, From fcd1156f104fc57bba3a54bb4ed4281ecad43209 Mon Sep 17 00:00:00 2001 From: Default28 Date: Wed, 20 Nov 2024 18:02:28 +0530 Subject: [PATCH 07/25] bad commit --- python-sdk/indexify/executor/agent.py | 3 + .../indexify/executor/executor_tasks.py | 28 ++++++ .../function_worker/function_worker.py | 92 +++++-------------- 3 files changed, 55 insertions(+), 68 deletions(-) diff --git a/python-sdk/indexify/executor/agent.py b/python-sdk/indexify/executor/agent.py index e19051126..6a53d7bf6 100644 --- a/python-sdk/indexify/executor/agent.py +++ b/python-sdk/indexify/executor/agent.py @@ -89,6 +89,7 @@ def __init__( self._task_store: TaskStore = TaskStore() self._executor_id = executor_id + console.print("Starting FunctionWorker", style="cyan bold") self._function_worker = FunctionWorker( workers=num_workers, indexify_client=IndexifyClient( @@ -360,6 +361,7 @@ async def task_launcher(self): continue async def run(self): + console.print("Starting Extractor Agent...", style="green") import signal asyncio.get_event_loop().add_signal_handler( @@ -376,6 +378,7 @@ def to_sentence_case(snake_str): words = snake_str.split("_") return words[0].capitalize() + "" + " ".join(words[1:]) + console.print("Starting Probe....") runtime_probe: ProbeInfo = self._probe.probe() executor_version = version("indexify") diff --git a/python-sdk/indexify/executor/executor_tasks.py b/python-sdk/indexify/executor/executor_tasks.py index 3d59cf0b7..372c196bc 100644 --- a/python-sdk/indexify/executor/executor_tasks.py +++ b/python-sdk/indexify/executor/executor_tasks.py @@ -71,3 +71,31 @@ def __init__( **kwargs, ) self.task = task + +class ExtractTask(asyncio.Future): + def __init__( + self, + *, + function_worker: FunctionWorker, + task: Task, + input: IndexifyData, + init_value: Optional[IndexifyData] = None, + code_path: str, + **kwargs, + ): + kwargs["name"] = "run_function" + kwargs["loop"] = asyncio.get_event_loop() + super().__init__( + function_worker.async_submit( + namespace=task.namespace, + graph_name=task.compute_graph, + fn_name=task.compute_fn, + input=input, + init_value=init_value, + code_path=code_path, + version=task.graph_version, + invocation_id=task.invocation_id, + ), + **kwargs, + ) + self.task = task diff --git a/python-sdk/indexify/executor/function_worker/function_worker.py b/python-sdk/indexify/executor/function_worker/function_worker.py index 25b5bbd3b..2e7d675f9 100644 --- a/python-sdk/indexify/executor/function_worker/function_worker.py +++ b/python-sdk/indexify/executor/function_worker/function_worker.py @@ -1,8 +1,11 @@ -import multiprocessing as mp +import asyncio import sys import traceback from asyncio import Future +from concurrent.futures import ThreadPoolExecutor +from keyword import kwlist from typing import Dict, List, Optional +from queue import Queue import cloudpickle from pydantic import BaseModel @@ -92,44 +95,28 @@ class FunctionWorker: def __init__( self, workers: int = get_optimal_process_count(), - pool_size: int = 1000, indexify_client: IndexifyClient = None, ) -> None: self._workers: int = workers self._indexify_client: IndexifyClient = indexify_client - self.job_queue: mp.Queue[tuple[Future, Job]] = mp.Queue(maxsize=pool_size) - self.shutdown_event = mp.Event() - self.running_processes: list[mp.Process] = [] - self.finished_jobs = mp.Queue() - self._run() + self._loop = asyncio.get_event_loop() + self._executor = ThreadPoolExecutor(max_workers=self._workers) + self._loop.set_default_executor(self._executor) - def _run(self): - while not self.shutdown_event.is_set(): - if not self.job_queue.empty(): - if len(self.running_processes) < self._workers: - future, job = self.job_queue.get() - process = mp.Process(target=self._run_process, args=(future, job)) - process.start() - self.running_processes.append(process) - else: - # pass and wait for some processes to finish. - continue - - def _run_process(self, future: Future, job: Job): + def _run_process(self, **kwargs): try: result = _run_function( - job.namespace, - job.graph_name, - job.fn_name, - job.input, - job.code_path, - job.version, - job.init_value, - job.invocation_id, + kwargs["namespace"], + kwargs["graph_name"], + kwargs["fn_name"], + kwargs["input"], + kwargs["code_path"], + kwargs["version"], + kwargs["init_value"], + kwargs["invocation_id"], self._indexify_client, ) - future.set_result( - FunctionWorkerOutput( + return FunctionWorkerOutput( fn_outputs=result.fn_outputs, router_output=result.router_output, stdout=result.stdout, @@ -137,55 +124,24 @@ def _run_process(self, future: Future, job: Job): reducer=result.reducer, success=result.success, ) - ) except Exception as e: - future.set_result( - FunctionWorkerOutput( + return FunctionWorkerOutput( stdout=e.stdout, stderr=e.stderr, reducer=e.is_reducer, success=False, ) - ) - async def async_submit( + def async_submit( self, - namespace: str, - graph_name: str, - fn_name: str, - input: IndexifyData, - code_path: str, - version: int, - init_value: Optional[IndexifyData] = None, - invocation_id: Optional[str] = None, + **kwargs ) -> Future: - completion_future = Future() - self.job_queue.put( - ( - completion_future, - Job( - namespace=namespace, - graph_name=graph_name, - fn_name=fn_name, - input=input, - code_path=code_path, - version=version, - init_value=init_value, - invocation_id=invocation_id, - indexify_client=self._indexify_client, - ), - ) - ) - return completion_future + return self._loop.run_in_executor(None, _run_function, kwargs) def shutdown(self): - self.shutdown_event.set() - # kill the processes when we receive shutdown signal - print( - f"[bold] function_worker: [/bold] Shutdown signal received, killing {len(self.running_processes)} processes" - ) - for process in self.running_processes: - process.kill() + # kill everything + self._loop.shutdown_default_executor() + self._loop.stop() def _run_function( From 85be72ca676b342a40c1b3c20c44ba450e3af330 Mon Sep 17 00:00:00 2001 From: Default28 Date: Thu, 21 Nov 2024 13:54:48 +0530 Subject: [PATCH 08/25] [WIP] Moved console from cli.py --- python-sdk/indexify/cli.py | 14 +------------- python-sdk/indexify/console.py | 12 ++++++++++++ 2 files changed, 13 insertions(+), 13 deletions(-) create mode 100644 python-sdk/indexify/console.py diff --git a/python-sdk/indexify/cli.py b/python-sdk/indexify/cli.py index d9532df67..9f4de9e14 100644 --- a/python-sdk/indexify/cli.py +++ b/python-sdk/indexify/cli.py @@ -10,11 +10,10 @@ import nanoid import typer -from rich.console import Console from rich.panel import Panel from rich.text import Text -from rich.theme import Theme +from indexify.console import console from indexify.executor.agent import ExtractorAgent from indexify.functions_sdk.image import ( DEFAULT_IMAGE_3_10, @@ -22,17 +21,6 @@ Image, ) -custom_theme = Theme( - { - "info": "cyan", - "warning": "yellow", - "error": "red", - "highlight": "magenta", - } -) - -console = Console(theme=custom_theme) - app = typer.Typer(pretty_exceptions_enable=False, no_args_is_help=True) diff --git a/python-sdk/indexify/console.py b/python-sdk/indexify/console.py new file mode 100644 index 000000000..d3ce32f19 --- /dev/null +++ b/python-sdk/indexify/console.py @@ -0,0 +1,12 @@ +from rich.console import Console +from rich.theme import Theme + +custom_theme = Theme( + { + "info": "cyan", + "warning": "yellow", + "error": "red", + "highlight": "magenta", + } +) +console = Console(theme=custom_theme) From 3e2a913b7782607f6f7a7ecc2eb03cb1ef0a2954 Mon Sep 17 00:00:00 2001 From: Default28 Date: Thu, 21 Nov 2024 13:55:22 +0530 Subject: [PATCH 09/25] [WIP] Added ThreadPool to event loop --- python-sdk/indexify/executor/agent.py | 79 ++++----------------------- 1 file changed, 10 insertions(+), 69 deletions(-) diff --git a/python-sdk/indexify/executor/agent.py b/python-sdk/indexify/executor/agent.py index 6a53d7bf6..a10f00cf2 100644 --- a/python-sdk/indexify/executor/agent.py +++ b/python-sdk/indexify/executor/agent.py @@ -2,6 +2,7 @@ import json import traceback from concurrent.futures.process import BrokenProcessPool +from concurrent.futures.thread import ThreadPoolExecutor from importlib.metadata import version from pathlib import Path from typing import Dict, List, Optional @@ -63,6 +64,8 @@ def __init__( name_alias: Optional[str] = None, image_version: Optional[int] = None, ): + event_loop = asyncio.get_event_loop() + event_loop.set_default_executor(ThreadPoolExecutor(max_workers=num_workers)) self.name_alias = name_alias self.image_version = image_version self._config_path = config_path @@ -91,7 +94,6 @@ def __init__( self._executor_id = executor_id console.print("Starting FunctionWorker", style="cyan bold") self._function_worker = FunctionWorker( - workers=num_workers, indexify_client=IndexifyClient( service_url=f"{self._protocol}://{server_addr}", config_path=config_path, @@ -109,60 +111,9 @@ def __init__( base_url=self._base_url, executor_id=self._executor_id, config_path=self._config_path, + task_store =self._task_store, ) - async def task_completion_reporter(self): - console.print(Text("Starting task completion reporter", style="bold cyan")) - # We should copy only the keys and not the values - url = f"{self._protocol}://{self._server_addr}/write_content" - - while True: - outcomes = await self._task_store.task_outcomes() - for task_outcome in outcomes: - retryStr = ( - f"\nRetries: {task_outcome.reporting_retries}" - if task_outcome.reporting_retries > 0 - else "" - ) - outcome = task_outcome.task_outcome - style_outcome = ( - f"[bold red] {outcome} [/]" - if "fail" in outcome - else f"[bold green] {outcome} [/]" - ) - console.print( - Panel( - f"Reporting outcome of task: {task_outcome.task.id}, function: {task_outcome.task.compute_fn}\n" - f"Outcome: {style_outcome}\n" - f"Num Fn Outputs: {len(task_outcome.outputs or [])}\n" - f"Router Output: {task_outcome.router_output}\n" - f"Retries: {task_outcome.reporting_retries}", - title="Task Completion", - border_style="info", - ) - ) - - try: - # Send task outcome to the server - self._task_reporter.report_task_outcome(completed_task=task_outcome) - except Exception as e: - # The connection was dropped in the middle of the reporting, process, retry - console.print( - Panel( - f"Failed to report task {task_outcome.task.id}\n" - f"Exception: {type(e).__name__}({e})\n" - f"Retries: {task_outcome.reporting_retries}\n" - "Retrying...", - title="Reporting Error", - border_style="error", - ) - ) - task_outcome.reporting_retries += 1 - await asyncio.sleep(5) - continue - - self._task_store.mark_reported(task_id=task_outcome.task.id) - async def task_launcher(self): async_tasks: List[asyncio.Task | asyncio.Future] = [] fn_queue: List[FunctionInput] = [] @@ -215,17 +166,9 @@ async def task_launcher(self): continue - async_tasks.append( - self._function_worker.async_submit( - namespace=task.namespace, - graph_name=task.compute_graph, - fn_name=task.compute_fn, - input=input, - init_value=fn.init_value, - code_path=f"{self._code_path}/{task.namespace}/{task.compute_graph}.{task.graph_version}", - version=task.graph_version, - invocation_id=task.invocation_id, - ) + code_path = f"{self._code_path}/{task.namespace}/{task.compute_graph}.{task.graph_version}" + async_tasks.append(self._function_worker.run_function(task, fn.input, fn.init_value, code_path) + # ExtractTask( # function_worker=self._function_worker, # task=task, @@ -256,7 +199,7 @@ async def task_launcher(self): task: Task for _, task in result.items(): async_tasks.append( - DownloadGraphTask(task=task, downloader=self._downloader) + self._downloader.download(task, "download_graph") ) async_tasks.append( asyncio.create_task( @@ -281,9 +224,7 @@ async def task_launcher(self): self._task_store.complete(outcome=completed_task) continue async_tasks.append( - DownloadInputTask( - task=async_task.task, downloader=self._downloader - ) + self._downloader.download(async_task.task, "download_input") ) elif async_task.get_name() == "download_input": if async_task.exception(): @@ -368,7 +309,7 @@ async def run(self): signal.SIGINT, self.shutdown, asyncio.get_event_loop() ) asyncio.create_task(self.task_launcher()) - asyncio.create_task(self.task_completion_reporter()) + asyncio.create_task(self._task_reporter.run()) self._should_run = True while self._should_run: url = f"{self._protocol}://{self._server_addr}/internal/executors/{self._executor_id}/tasks" From 297c85b89b31194891cb1d0c0de00aece354d10f Mon Sep 17 00:00:00 2001 From: Default28 Date: Thu, 21 Nov 2024 13:56:19 +0530 Subject: [PATCH 10/25] [WIP] Added models for run and download task --- python-sdk/indexify/executor/downloadtask.py | 22 ++++++++++++++++++++ python-sdk/indexify/executor/runtask.py | 21 +++++++++++++++++++ 2 files changed, 43 insertions(+) create mode 100644 python-sdk/indexify/executor/downloadtask.py create mode 100644 python-sdk/indexify/executor/runtask.py diff --git a/python-sdk/indexify/executor/downloadtask.py b/python-sdk/indexify/executor/downloadtask.py new file mode 100644 index 000000000..3875187b7 --- /dev/null +++ b/python-sdk/indexify/executor/downloadtask.py @@ -0,0 +1,22 @@ +import asyncio + +from indexify.executor.api_objects import Task + + +class DownloadTask(asyncio.Task): + def __init__( + self, + *, + task: Task, + coroutine, + name, + loop, + **kwargs, + ): + kwargs["name"] = name + kwargs["loop"] = loop + super().__init__( + coroutine, + **kwargs, + ) + self.task = task diff --git a/python-sdk/indexify/executor/runtask.py b/python-sdk/indexify/executor/runtask.py new file mode 100644 index 000000000..52c2cfd03 --- /dev/null +++ b/python-sdk/indexify/executor/runtask.py @@ -0,0 +1,21 @@ +import asyncio + +from indexify.executor.api_objects import Task + + +class RunFunctionTask(asyncio.Task): + def __init__( + self, + *, + task: Task, + coroutine, + loop, + **kwargs, + ): + kwargs["name"] = "run_function" + kwargs["loop"] = loop + super().__init__( + coroutine, + **kwargs, + ) + self.task = task From e93d5a2497cdc167a7e701537ca47ee2c6f1913d Mon Sep 17 00:00:00 2001 From: Default28 Date: Thu, 21 Nov 2024 13:56:57 +0530 Subject: [PATCH 11/25] [WIP] Using event loop in downloader.py --- python-sdk/indexify/executor/downloader.py | 15 +++- .../indexify/executor/executor_tasks.py | 79 ++++++++++++------- 2 files changed, 66 insertions(+), 28 deletions(-) diff --git a/python-sdk/indexify/executor/downloader.py b/python-sdk/indexify/executor/downloader.py index 75ac9bbdc..9c2f4fec8 100644 --- a/python-sdk/indexify/executor/downloader.py +++ b/python-sdk/indexify/executor/downloader.py @@ -1,3 +1,4 @@ +import asyncio import os from typing import Optional @@ -8,6 +9,7 @@ from rich.theme import Theme from indexify.functions_sdk.data_objects import IndexifyData +from .downloadtask import DownloadTask from ..common_util import get_httpx_client from ..functions_sdk.object_serializer import JsonSerializer, get_serializer @@ -31,11 +33,22 @@ class DownloadedInputs(BaseModel): class Downloader: def __init__( - self, code_path: str, base_url: str, config_path: Optional[str] = None + self, + code_path: str, + base_url: str, + config_path: Optional[str] = None, ): self.code_path = code_path self.base_url = base_url self._client = get_httpx_client(config_path) + self._event_loop = asyncio.get_event_loop() + + def download(self, task, name): + if name == 'download_graph': + coroutine = self.download_graph(task.namespace, task.compute_graph, task.graph_version) + else: + coroutine = self.download_input(task) + return DownloadTask(task=task, coroutine=coroutine, name=name, loop=self._event_loop) async def download_graph(self, namespace: str, name: str, version: int) -> str: path = os.path.join(self.code_path, namespace, f"{name}.{version}") diff --git a/python-sdk/indexify/executor/executor_tasks.py b/python-sdk/indexify/executor/executor_tasks.py index 372c196bc..2bb4a9f1f 100644 --- a/python-sdk/indexify/executor/executor_tasks.py +++ b/python-sdk/indexify/executor/executor_tasks.py @@ -1,12 +1,37 @@ import asyncio from typing import Optional +from pydantic import BaseModel + from indexify.executor.function_worker import FunctionWorker from indexify.functions_sdk.data_objects import IndexifyData from .api_objects import Task from .downloader import Downloader +class Job(BaseModel): + job_name: str + job_id: str + namespace: str + compute_graph: str + compute_fn: str + invocation_id: str + input_key: str + reducer_output_id: Optional[str] = None + graph_version: int + +def convert_job_to_task(Job): + return Task( + job_id=Job.job_id, + namespace=Job.namespace, + compute_graph=Job.compute_graph, + compute_fn=Job.compute_fn, + invocation_id=Job.invocation_id, + input_key=Job.input_key, + reducer_output_id=Job.reducer_output_id, + graph_version=Job.graph_version, + ) + class DownloadGraphTask(asyncio.Task): def __init__( @@ -72,30 +97,30 @@ def __init__( ) self.task = task -class ExtractTask(asyncio.Future): - def __init__( - self, - *, - function_worker: FunctionWorker, - task: Task, - input: IndexifyData, - init_value: Optional[IndexifyData] = None, - code_path: str, - **kwargs, - ): - kwargs["name"] = "run_function" - kwargs["loop"] = asyncio.get_event_loop() - super().__init__( - function_worker.async_submit( - namespace=task.namespace, - graph_name=task.compute_graph, - fn_name=task.compute_fn, - input=input, - init_value=init_value, - code_path=code_path, - version=task.graph_version, - invocation_id=task.invocation_id, - ), - **kwargs, - ) - self.task = task +# class ExtractTask(asyncio.Future): +# def __init__( +# self, +# *, +# function_worker: FunctionWorker, +# task: Task, +# input: IndexifyData, +# init_value: Optional[IndexifyData] = None, +# code_path: str, +# **kwargs, +# ): +# kwargs["name"] = "run_function" +# kwargs["loop"] = asyncio.get_event_loop() +# super().__init__( +# function_worker.async_submit( +# namespace=task.namespace, +# graph_name=task.compute_graph, +# fn_name=task.compute_fn, +# input=input, +# init_value=init_value, +# code_path=code_path, +# version=task.graph_version, +# invocation_id=task.invocation_id, +# ), +# **kwargs, +# ) +# self.task = task From bcd0e5b637f9a1964f50d5dbb6c2150508169d09 Mon Sep 17 00:00:00 2001 From: Default28 Date: Thu, 21 Nov 2024 13:57:32 +0530 Subject: [PATCH 12/25] [WIP] added event loop in function worker --- .../function_worker/function_worker.py | 79 ++++++++++--------- 1 file changed, 43 insertions(+), 36 deletions(-) diff --git a/python-sdk/indexify/executor/function_worker/function_worker.py b/python-sdk/indexify/executor/function_worker/function_worker.py index 2e7d675f9..6483e0d3c 100644 --- a/python-sdk/indexify/executor/function_worker/function_worker.py +++ b/python-sdk/indexify/executor/function_worker/function_worker.py @@ -1,20 +1,14 @@ import asyncio import sys import traceback -from asyncio import Future -from concurrent.futures import ThreadPoolExecutor -from keyword import kwlist from typing import Dict, List, Optional -from queue import Queue import cloudpickle from pydantic import BaseModel from rich import print from indexify import IndexifyClient -from indexify.executor.function_worker.function_worker_utils import ( - get_optimal_process_count, -) +from indexify.executor.runtask import RunFunctionTask from indexify.functions_sdk.data_objects import ( FunctionWorkerOutput, IndexifyData, @@ -94,18 +88,30 @@ class Job(BaseModel): class FunctionWorker: def __init__( self, - workers: int = get_optimal_process_count(), indexify_client: IndexifyClient = None, ) -> None: - self._workers: int = workers self._indexify_client: IndexifyClient = indexify_client self._loop = asyncio.get_event_loop() - self._executor = ThreadPoolExecutor(max_workers=self._workers) - self._loop.set_default_executor(self._executor) - def _run_process(self, **kwargs): + def run_function(self, task, fn_input, init_value, code_path): + return RunFunctionTask(task=task, coroutine=self.async_submit( + namespace=task.namespace, + graph_name=task.compute_graph, + fn_name=task.compute_fn, + input=fn_input, + init_value=init_value, + code_path=code_path, + version=task.graph_version, + invocation_id=task.invocation_id, + ), loop=self._loop) + + async def async_submit( + self, + **kwargs + ) -> FunctionWorkerOutput: try: - result = _run_function( + print(f"Submitting async function.....") + result = await _run_function( kwargs["namespace"], kwargs["graph_name"], kwargs["fn_name"], @@ -117,34 +123,28 @@ def _run_process(self, **kwargs): self._indexify_client, ) return FunctionWorkerOutput( - fn_outputs=result.fn_outputs, - router_output=result.router_output, - stdout=result.stdout, - stderr=result.stderr, - reducer=result.reducer, - success=result.success, - ) + fn_outputs=result.fn_outputs, + router_output=result.router_output, + stdout=result.stdout, + stderr=result.stderr, + reducer=result.reducer, + success=result.success, + ) except Exception as e: + print(e) return FunctionWorkerOutput( - stdout=e.stdout, - stderr=e.stderr, - reducer=e.is_reducer, - success=False, - ) - - def async_submit( - self, - **kwargs - ) -> Future: - return self._loop.run_in_executor(None, _run_function, kwargs) + stdout=e.stdout, + stderr=e.stderr, + reducer=e.is_reducer, + success=False, + ) def shutdown(self): # kill everything - self._loop.shutdown_default_executor() self._loop.stop() -def _run_function( +async def _run_function( namespace: str, graph_name: str, fn_name: str, @@ -192,9 +192,16 @@ def _run_function( print(router_call_result.traceback_msg, file=sys.stderr) has_failed = True else: - fn_call_result: FunctionCallResult = fn.invoke_fn_ser( - fn_name, input, init_value - ) + print(f"is function async: {fn.indexify_function.is_async}") + if not fn.indexify_function.is_async: + fn_call_result: FunctionCallResult = fn.invoke_fn_ser( + fn_name, input, init_value + ) + else: + fn_call_result: FunctionCallResult = await fn.invoke_fn_ser_async( + fn_name, input, init_value + ) + print(f"serialized function output: {fn_call_result}") is_reducer = fn.indexify_function.accumulate is not None fn_output = fn_call_result.ser_outputs if fn_call_result.traceback_msg is not None: From 45e5789fbbba0f319165e20ef0fb61156b37c171 Mon Sep 17 00:00:00 2001 From: Default28 Date: Thu, 21 Nov 2024 13:58:02 +0530 Subject: [PATCH 13/25] [WIP] Refactored task_reporter.py --- python-sdk/indexify/executor/task_reporter.py | 67 ++++++++++++++++++- 1 file changed, 64 insertions(+), 3 deletions(-) diff --git a/python-sdk/indexify/executor/task_reporter.py b/python-sdk/indexify/executor/task_reporter.py index c905eb94e..8c83f4b7b 100644 --- a/python-sdk/indexify/executor/task_reporter.py +++ b/python-sdk/indexify/executor/task_reporter.py @@ -1,15 +1,19 @@ -import io +import asyncio from typing import Optional +from rich.panel import Panel +from rich.text import Text + import nanoid from httpx import Timeout from pydantic import BaseModel from rich import print +from indexify.console import console from indexify.common_util import get_httpx_client from indexify.executor.api_objects import RouterOutput as ApiRouterOutput from indexify.executor.api_objects import TaskResult -from indexify.executor.task_store import CompletedTask +from indexify.executor.task_store import CompletedTask, TaskStore from indexify.functions_sdk.object_serializer import get_serializer @@ -32,13 +36,68 @@ class ReportingData(BaseModel): stderr_total_bytes: int = 0 +def _log_exception(task_outcome, e): + console.print( + Panel( + f"Failed to report task {task_outcome.task.id}\n" + f"Exception: {type(e).__name__}({e})\n" + f"Retries: {task_outcome.reporting_retries}\n" + "Retrying...", + title="Reporting Error", + border_style="error", + ) + ) + + +def _log(task_outcome): + outcome = task_outcome.task_outcome + style_outcome = ( + f"[bold red] {outcome} [/]" + if "fail" in outcome + else f"[bold green] {outcome} [/]" + ) + console.print( + Panel( + f"Reporting outcome of task: {task_outcome.task.id}, function: {task_outcome.task.compute_fn}\n" + f"Outcome: {style_outcome}\n" + f"Num Fn Outputs: {len(task_outcome.outputs or [])}\n" + f"Router Output: {task_outcome.router_output}\n" + f"Retries: {task_outcome.reporting_retries}", + title="Task Completion", + border_style="info", + ) + ) + + class TaskReporter: def __init__( - self, base_url: str, executor_id: str, config_path: Optional[str] = None + self, base_url: str, executor_id: str, task_store: TaskStore, config_path: Optional[str] = None, ): self._base_url = base_url self._executor_id = executor_id self._client = get_httpx_client(config_path) + self._task_store = task_store + + async def run(self): + console.print( + Text("Starting task completion reporter", style="bold cyan")) + # We should copy only the keys and not the values + + while True: + outcomes = await self._task_store.task_outcomes() + for task_outcome in outcomes: + _log(task_outcome) + try: + # Send task outcome to the server + self.report_task_outcome(completed_task=task_outcome) + except Exception as e: + # The connection was dropped in the middle of the reporting, process, retry + _log_exception(task_outcome, e) + task_outcome.reporting_retries += 1 + await asyncio.sleep(5) + continue + + self._task_store.mark_reported(task_id=task_outcome.task.id) def report_task_outcome(self, completed_task: CompletedTask): @@ -149,3 +208,5 @@ def report_task_outcome(self, completed_task: CompletedTask): f"[bold]task-reporter[/bold] failed to report task outcome retries={completed_task.reporting_retries} {response.text}" ) raise e + + From 64b3c86d2d54ab87fc0acd07ad2010a8679662b4 Mon Sep 17 00:00:00 2001 From: Default28 Date: Thu, 21 Nov 2024 13:58:41 +0530 Subject: [PATCH 14/25] Added async function support in indexify_function decorator --- .../functions_sdk/indexify_functions.py | 58 ++++++++++++++++++- 1 file changed, 57 insertions(+), 1 deletion(-) diff --git a/python-sdk/indexify/functions_sdk/indexify_functions.py b/python-sdk/indexify/functions_sdk/indexify_functions.py index b57db40ba..19c59b4d7 100644 --- a/python-sdk/indexify/functions_sdk/indexify_functions.py +++ b/python-sdk/indexify/functions_sdk/indexify_functions.py @@ -83,11 +83,15 @@ class IndexifyFunction: image: Optional[Image] = DEFAULT_IMAGE_3_10 placement_constraints: List[PlacementConstraints] = [] accumulate: Optional[Type[Any]] = None - encoder: Optional[str] = "cloudpickle" + encoder: Optional[str] = "cloudpickle", + is_async: bool = False, def run(self, *args, **kwargs) -> Union[List[Any], Any]: pass + async def async_run(self, *args, **kwargs) -> Union[List[Any], Any]: + pass + def partial(self, **kwargs) -> Callable: from functools import partial @@ -170,6 +174,9 @@ def construct(fn): def run(self, *args, **kwargs): return fn(*args, **kwargs) + async def async_run(self, *args, **kwargs): + return await fn(*args, **kwargs) + # Apply original signature and annotations to run method run.__signature__ = fn_sig run.__annotations__ = fn_hints @@ -186,6 +193,8 @@ def run(self, *args, **kwargs): "accumulate": accumulate, "encoder": encoder, "run": run, + "async_run": async_run, + "is_async": inspect.iscoroutinefunction(fn), } return type("IndexifyFunction", (IndexifyFunction,), attrs) @@ -276,6 +285,30 @@ def run_fn( ) return output, None + async def run_fn_async( + self, input: Union[Dict, Type[BaseModel]], acc: Type[Any] = None + ) -> Tuple[List[Any], Optional[str]]: + args = [] + kwargs = {} + if acc is not None: + args.append(acc) + if isinstance(input, dict): + kwargs = input + else: + args.append(input) + + try: + extracted_data = await self.indexify_function.run(*args, **kwargs) + except Exception as e: + return [], traceback.format_exc() + if extracted_data is None: + return [], None + + output = ( + extracted_data if isinstance(extracted_data, list) else [extracted_data] + ) + return output, None + def invoke_fn_ser( self, name: str, input: IndexifyData, acc: Optional[Any] = None ) -> FunctionCallResult: @@ -299,6 +332,29 @@ def invoke_fn_ser( ] return FunctionCallResult(ser_outputs=ser_outputs, traceback_msg=err) + async def invoke_fn_ser_async( + self, name: str, input: IndexifyData, acc: Optional[Any] = None + ) -> FunctionCallResult: + input = self.deserialize_input(name, input) + serializer = get_serializer(self.indexify_function.encoder) + if acc is not None: + acc = self.indexify_function.accumulate.model_validate( + serializer.deserialize(acc.payload) + ) + if acc is None and self.indexify_function.accumulate is not None: + acc = self.indexify_function.accumulate.model_validate( + self.indexify_function.accumulate() + ) + outputs, err = await self.run_fn_async(input, acc=acc) + ser_outputs = [ + IndexifyData( + payload=serializer.serialize(output), + encoder=self.indexify_function.encoder, + ) + for output in outputs + ] + return FunctionCallResult(ser_outputs=ser_outputs, traceback_msg=err) + def invoke_router(self, name: str, input: IndexifyData) -> RouterCallResult: input = self.deserialize_input(name, input) edges, err = self.run_router(input) From a3c2dfbdcca1be96dbfd4750002d8ebd1e873ac7 Mon Sep 17 00:00:00 2001 From: Default28 Date: Thu, 21 Nov 2024 14:37:18 +0530 Subject: [PATCH 15/25] Working async functions --- .../indexify/functions_sdk/indexify_functions.py | 4 ++-- python-sdk/tests/test_graph_behaviours.py | 10 ++++++++++ 2 files changed, 12 insertions(+), 2 deletions(-) diff --git a/python-sdk/indexify/functions_sdk/indexify_functions.py b/python-sdk/indexify/functions_sdk/indexify_functions.py index 19c59b4d7..d1b0add0d 100644 --- a/python-sdk/indexify/functions_sdk/indexify_functions.py +++ b/python-sdk/indexify/functions_sdk/indexify_functions.py @@ -83,8 +83,8 @@ class IndexifyFunction: image: Optional[Image] = DEFAULT_IMAGE_3_10 placement_constraints: List[PlacementConstraints] = [] accumulate: Optional[Type[Any]] = None - encoder: Optional[str] = "cloudpickle", - is_async: bool = False, + encoder: Optional[str] = "cloudpickle" + is_async: bool = False def run(self, *args, **kwargs) -> Union[List[Any], Any]: pass diff --git a/python-sdk/tests/test_graph_behaviours.py b/python-sdk/tests/test_graph_behaviours.py index 73c2b819c..3b4aa7484 100644 --- a/python-sdk/tests/test_graph_behaviours.py +++ b/python-sdk/tests/test_graph_behaviours.py @@ -226,8 +226,18 @@ def remote_or_local_pipeline(pipeline, remote=True): return RemotePipeline.deploy(pipeline) return pipeline +@indexify_function() +async def async_simple_function(x: int) -> int: + return x * x class TestGraphBehaviors(unittest.TestCase): + def test_async_simple_function(self): + graph = Graph(name="test_async_simple_function", description="test", start_node=async_simple_function) + graph = RemoteGraph.deploy(graph) + invocation_id = graph.run(block_until_done=True, x=10) + output = graph.output(invocation_id, "async_simple_function") + self.assertEqual(output, [100]) + @parameterized.expand([(False), (True)]) def test_simple_function(self, is_remote): graph = Graph( From 4e5bfbfe5ea97cb9dd40e9c14212b5bfbb644a3a Mon Sep 17 00:00:00 2001 From: Default28 Date: Thu, 21 Nov 2024 14:38:03 +0530 Subject: [PATCH 16/25] linting --- python-sdk/indexify/executor/agent.py | 8 +++++--- python-sdk/indexify/executor/downloader.py | 12 ++++++++---- python-sdk/indexify/executor/executor_tasks.py | 3 +++ .../function_worker/function_worker.py | 13 +++++++------ python-sdk/indexify/executor/task_reporter.py | 18 +++++++++--------- python-sdk/tests/test_graph_behaviours.py | 8 +++++++- 6 files changed, 39 insertions(+), 23 deletions(-) diff --git a/python-sdk/indexify/executor/agent.py b/python-sdk/indexify/executor/agent.py index a10f00cf2..c2ba4eabc 100644 --- a/python-sdk/indexify/executor/agent.py +++ b/python-sdk/indexify/executor/agent.py @@ -111,7 +111,7 @@ def __init__( base_url=self._base_url, executor_id=self._executor_id, config_path=self._config_path, - task_store =self._task_store, + task_store=self._task_store, ) async def task_launcher(self): @@ -167,8 +167,10 @@ async def task_launcher(self): continue code_path = f"{self._code_path}/{task.namespace}/{task.compute_graph}.{task.graph_version}" - async_tasks.append(self._function_worker.run_function(task, fn.input, fn.init_value, code_path) - + async_tasks.append( + self._function_worker.run_function( + task, fn.input, fn.init_value, code_path + ) # ExtractTask( # function_worker=self._function_worker, # task=task, diff --git a/python-sdk/indexify/executor/downloader.py b/python-sdk/indexify/executor/downloader.py index 9c2f4fec8..5b932a18a 100644 --- a/python-sdk/indexify/executor/downloader.py +++ b/python-sdk/indexify/executor/downloader.py @@ -9,11 +9,11 @@ from rich.theme import Theme from indexify.functions_sdk.data_objects import IndexifyData -from .downloadtask import DownloadTask from ..common_util import get_httpx_client from ..functions_sdk.object_serializer import JsonSerializer, get_serializer from .api_objects import Task +from .downloadtask import DownloadTask custom_theme = Theme( { @@ -44,11 +44,15 @@ def __init__( self._event_loop = asyncio.get_event_loop() def download(self, task, name): - if name == 'download_graph': - coroutine = self.download_graph(task.namespace, task.compute_graph, task.graph_version) + if name == "download_graph": + coroutine = self.download_graph( + task.namespace, task.compute_graph, task.graph_version + ) else: coroutine = self.download_input(task) - return DownloadTask(task=task, coroutine=coroutine, name=name, loop=self._event_loop) + return DownloadTask( + task=task, coroutine=coroutine, name=name, loop=self._event_loop + ) async def download_graph(self, namespace: str, name: str, version: int) -> str: path = os.path.join(self.code_path, namespace, f"{name}.{version}") diff --git a/python-sdk/indexify/executor/executor_tasks.py b/python-sdk/indexify/executor/executor_tasks.py index 2bb4a9f1f..511dbca02 100644 --- a/python-sdk/indexify/executor/executor_tasks.py +++ b/python-sdk/indexify/executor/executor_tasks.py @@ -9,6 +9,7 @@ from .api_objects import Task from .downloader import Downloader + class Job(BaseModel): job_name: str job_id: str @@ -20,6 +21,7 @@ class Job(BaseModel): reducer_output_id: Optional[str] = None graph_version: int + def convert_job_to_task(Job): return Task( job_id=Job.job_id, @@ -97,6 +99,7 @@ def __init__( ) self.task = task + # class ExtractTask(asyncio.Future): # def __init__( # self, diff --git a/python-sdk/indexify/executor/function_worker/function_worker.py b/python-sdk/indexify/executor/function_worker/function_worker.py index 6483e0d3c..4056037ef 100644 --- a/python-sdk/indexify/executor/function_worker/function_worker.py +++ b/python-sdk/indexify/executor/function_worker/function_worker.py @@ -94,7 +94,9 @@ def __init__( self._loop = asyncio.get_event_loop() def run_function(self, task, fn_input, init_value, code_path): - return RunFunctionTask(task=task, coroutine=self.async_submit( + return RunFunctionTask( + task=task, + coroutine=self.async_submit( namespace=task.namespace, graph_name=task.compute_graph, fn_name=task.compute_fn, @@ -103,12 +105,11 @@ def run_function(self, task, fn_input, init_value, code_path): code_path=code_path, version=task.graph_version, invocation_id=task.invocation_id, - ), loop=self._loop) + ), + loop=self._loop, + ) - async def async_submit( - self, - **kwargs - ) -> FunctionWorkerOutput: + async def async_submit(self, **kwargs) -> FunctionWorkerOutput: try: print(f"Submitting async function.....") result = await _run_function( diff --git a/python-sdk/indexify/executor/task_reporter.py b/python-sdk/indexify/executor/task_reporter.py index 8c83f4b7b..e31d6235d 100644 --- a/python-sdk/indexify/executor/task_reporter.py +++ b/python-sdk/indexify/executor/task_reporter.py @@ -1,16 +1,15 @@ import asyncio from typing import Optional -from rich.panel import Panel -from rich.text import Text - import nanoid from httpx import Timeout from pydantic import BaseModel from rich import print +from rich.panel import Panel +from rich.text import Text -from indexify.console import console from indexify.common_util import get_httpx_client +from indexify.console import console from indexify.executor.api_objects import RouterOutput as ApiRouterOutput from indexify.executor.api_objects import TaskResult from indexify.executor.task_store import CompletedTask, TaskStore @@ -71,7 +70,11 @@ def _log(task_outcome): class TaskReporter: def __init__( - self, base_url: str, executor_id: str, task_store: TaskStore, config_path: Optional[str] = None, + self, + base_url: str, + executor_id: str, + task_store: TaskStore, + config_path: Optional[str] = None, ): self._base_url = base_url self._executor_id = executor_id @@ -79,8 +82,7 @@ def __init__( self._task_store = task_store async def run(self): - console.print( - Text("Starting task completion reporter", style="bold cyan")) + console.print(Text("Starting task completion reporter", style="bold cyan")) # We should copy only the keys and not the values while True: @@ -208,5 +210,3 @@ def report_task_outcome(self, completed_task: CompletedTask): f"[bold]task-reporter[/bold] failed to report task outcome retries={completed_task.reporting_retries} {response.text}" ) raise e - - diff --git a/python-sdk/tests/test_graph_behaviours.py b/python-sdk/tests/test_graph_behaviours.py index 3b4aa7484..8498a3506 100644 --- a/python-sdk/tests/test_graph_behaviours.py +++ b/python-sdk/tests/test_graph_behaviours.py @@ -226,13 +226,19 @@ def remote_or_local_pipeline(pipeline, remote=True): return RemotePipeline.deploy(pipeline) return pipeline + @indexify_function() async def async_simple_function(x: int) -> int: return x * x + class TestGraphBehaviors(unittest.TestCase): def test_async_simple_function(self): - graph = Graph(name="test_async_simple_function", description="test", start_node=async_simple_function) + graph = Graph( + name="test_async_simple_function", + description="test", + start_node=async_simple_function, + ) graph = RemoteGraph.deploy(graph) invocation_id = graph.run(block_until_done=True, x=10) output = graph.output(invocation_id, "async_simple_function") From 59c1767d9a83247217a3f2e3c3e8965f2589f1c6 Mon Sep 17 00:00:00 2001 From: Default28 Date: Thu, 21 Nov 2024 14:43:56 +0530 Subject: [PATCH 17/25] Remoiving unused objects --- python-sdk/indexify/executor/agent.py | 2 -- python-sdk/indexify/executor/api_objects.py | 5 ----- 2 files changed, 7 deletions(-) diff --git a/python-sdk/indexify/executor/agent.py b/python-sdk/indexify/executor/agent.py index c2ba4eabc..6b1a785a4 100644 --- a/python-sdk/indexify/executor/agent.py +++ b/python-sdk/indexify/executor/agent.py @@ -83,7 +83,6 @@ def __init__( f"Require Bootstrap? {self._require_image_bootstrap}", style="cyan bold" ) - self.num_workers = num_workers if config_path: console.print("Running the extractor with TLS enabled", style="cyan bold") self._protocol = "https" @@ -106,7 +105,6 @@ def __init__( self._downloader = Downloader( code_path=code_path, base_url=self._base_url, config_path=self._config_path ) - self._max_queued_tasks = 10 self._task_reporter = TaskReporter( base_url=self._base_url, executor_id=self._executor_id, diff --git a/python-sdk/indexify/executor/api_objects.py b/python-sdk/indexify/executor/api_objects.py index 0c4538bde..281d5b3ba 100644 --- a/python-sdk/indexify/executor/api_objects.py +++ b/python-sdk/indexify/executor/api_objects.py @@ -28,11 +28,6 @@ class ExecutorMetadata(BaseModel): class RouterOutput(BaseModel): edges: List[str] - -class FnOutput(BaseModel): - payload: Json - - class TaskResult(BaseModel): router_output: Optional[RouterOutput] = None outcome: str From 82b139a17cb2e3be666bf86247f610849cb3f63a Mon Sep 17 00:00:00 2001 From: Default28 Date: Thu, 21 Nov 2024 15:19:42 +0530 Subject: [PATCH 18/25] Added Indexify client in downloader.py --- python-sdk/indexify/executor/agent.py | 15 ++++-- python-sdk/indexify/executor/downloader.py | 63 +++++++--------------- python-sdk/indexify/http_client.py | 12 +++++ 3 files changed, 40 insertions(+), 50 deletions(-) diff --git a/python-sdk/indexify/executor/agent.py b/python-sdk/indexify/executor/agent.py index 6b1a785a4..f8f4c32a5 100644 --- a/python-sdk/indexify/executor/agent.py +++ b/python-sdk/indexify/executor/agent.py @@ -78,6 +78,7 @@ def __init__( else False ) self._executor_bootstrap_failed = False + self._server_addr = server_addr console.print( f"Require Bootstrap? {self._require_image_bootstrap}", style="cyan bold" @@ -89,21 +90,25 @@ def __init__( else: self._protocol = "http" + self._base_url = f"{self._protocol}://{self._server_addr}" self._task_store: TaskStore = TaskStore() self._executor_id = executor_id console.print("Starting FunctionWorker", style="cyan bold") self._function_worker = FunctionWorker( indexify_client=IndexifyClient( - service_url=f"{self._protocol}://{server_addr}", - config_path=config_path, + service_url=self._base_url, + config_path=self._config_path, ), ) self._has_registered = False - self._server_addr = server_addr - self._base_url = f"{self._protocol}://{self._server_addr}" self._code_path = code_path self._downloader = Downloader( - code_path=code_path, base_url=self._base_url, config_path=self._config_path + indexify_client=IndexifyClient( + service_url=self._base_url, + config_path=self._config_path, + ), + code_path=code_path, + base_url=self._base_url ) self._task_reporter = TaskReporter( base_url=self._base_url, diff --git a/python-sdk/indexify/executor/downloader.py b/python-sdk/indexify/executor/downloader.py index 5b932a18a..29961d566 100644 --- a/python-sdk/indexify/executor/downloader.py +++ b/python-sdk/indexify/executor/downloader.py @@ -36,11 +36,11 @@ def __init__( self, code_path: str, base_url: str, - config_path: Optional[str] = None, + indexify_client: IndexifyClient, ): self.code_path = code_path self.base_url = base_url - self._client = get_httpx_client(config_path) + self._client = indexify_client self._event_loop = asyncio.get_event_loop() def download(self, task, name): @@ -48,8 +48,11 @@ def download(self, task, name): coroutine = self.download_graph( task.namespace, task.compute_graph, task.graph_version ) - else: + elif name == "download_input": coroutine = self.download_input(task) + else: + raise Exception("Unsupported task name") + return DownloadTask( task=task, coroutine=coroutine, name=name, loop=self._event_loop ) @@ -67,21 +70,7 @@ async def download_graph(self, namespace: str, name: str, version: int) -> str: ) ) - response = self._client.get( - f"{self.base_url}/internal/namespaces/{namespace}/compute_graphs/{name}/code" - ) - try: - response.raise_for_status() - except httpx.HTTPStatusError as e: - console.print( - Panel( - f"Failed to download graph: {name}\nError: {response.text}", - title="downloader error", - border_style="error", - ) - ) - raise - + response = self._client.download_graph(namespace, name) os.makedirs(os.path.dirname(path), exist_ok=True) with open(path, "wb") as f: f.write(response.content) @@ -100,25 +89,21 @@ async def download_input(self, task: Task) -> DownloadedInputs: console.print( Panel( - f"downloading input\nURL: {url} \n reducer input URL: {reducer_url}", + f"downloading input\nFunction: {task.compute_fn} \n reducer id: {task.reducer_output_id}", title="downloader", border_style="cyan", ) ) - response = self._client.get(url) - - try: - response.raise_for_status() - except httpx.HTTPStatusError as e: - console.print( - Panel( - f"failed to download input: {task.input_key}\nError: {response.text}", - title="downloader error", - border_style="error", - ) - ) - raise + input_id = task.input_key.split("|")[-1] + if task.invocation_id == input_id: + response = self._client.download_fn_input(task.namespace, task.compute_graph, task.invocation_id) + else: + response = self._client.download_fn_output(task.input_key) + + init_value = None + if task.reducer_output_id: + init_value = self._client.download_reducer_input(task.namespace, task.compute_graph, task.invocation_id, task.compute_fn, task.reducer_output_id) encoder = ( "json" @@ -136,19 +121,7 @@ async def download_input(self, task: Task) -> DownloadedInputs: deserialized_content = serializer.deserialize(response.content) - if reducer_url: - init_value = self._client.get(reducer_url) - try: - init_value.raise_for_status() - except httpx.HTTPStatusError as e: - console.print( - Panel( - f"failed to download reducer output: {task.reducer_output_id}\nError: {init_value.text}", - title="downloader error", - border_style="error", - ) - ) - raise + if init_value: init_value = serializer.deserialize(init_value.content) return DownloadedInputs( input=IndexifyData( diff --git a/python-sdk/indexify/http_client.py b/python-sdk/indexify/http_client.py index b2a080cb4..177768b21 100644 --- a/python-sdk/indexify/http_client.py +++ b/python-sdk/indexify/http_client.py @@ -164,6 +164,18 @@ def __enter__(self): def __exit__(self, exc_type, exc_value, traceback): self.close() + def download_graph(self, namespace: str, compute_graph: str): + return self._get(f"internal/namespaces/{namespace}/compute_graphs/{compute_graph}/code") + + def download_fn_input(self, namespace: str, compute_graph: str, invocation_id: str): + return self._get(f"namespaces/{namespace}/compute_graphs/{compute_graph}/invocations/{invocation_id}/payload") + + def download_fn_output(self, input_key: str): + return self._get(f"internal/fn_outputs/{input_key}") + + def download_reducer_input(self, namespace: str, compute_graph: str, invocation_id: str, compute_fn: str, reducer_output_id: str): + return self._get(f"namespaces/{namespace}/compute_graphs/{compute_graph}/invocations/{invocation_id}/fn/{compute_fn}/output/{reducer_output_id}") + def register_compute_graph(self, graph: Graph, additional_modules): graph_metadata = graph.definition() serialized_code = cloudpickle.dumps(graph.serialize(additional_modules)) From 2e63d3df20b8a9570bf12762df9286afbe106150 Mon Sep 17 00:00:00 2001 From: Default28 Date: Thu, 21 Nov 2024 15:38:39 +0530 Subject: [PATCH 19/25] Deleting unused objects --- python-sdk/indexify/executor/agent.py | 1 - python-sdk/indexify/executor/downloader.py | 4 +- python-sdk/indexify/executor/downloadtask.py | 22 ---- .../indexify/executor/executor_tasks.py | 116 ++---------------- .../function_worker/function_worker.py | 51 +------- .../function_worker/function_worker_utils.py | 42 ++++--- python-sdk/indexify/executor/runtask.py | 21 ---- 7 files changed, 48 insertions(+), 209 deletions(-) delete mode 100644 python-sdk/indexify/executor/downloadtask.py delete mode 100644 python-sdk/indexify/executor/runtask.py diff --git a/python-sdk/indexify/executor/agent.py b/python-sdk/indexify/executor/agent.py index f8f4c32a5..462604af9 100644 --- a/python-sdk/indexify/executor/agent.py +++ b/python-sdk/indexify/executor/agent.py @@ -27,7 +27,6 @@ from . import image_dependency_installer from .api_objects import ExecutorMetadata, Task from .downloader import DownloadedInputs, Downloader -from .executor_tasks import DownloadGraphTask, DownloadInputTask, ExtractTask from .runtime_probes import ProbeInfo, RuntimeProbes from .task_reporter import TaskReporter from .task_store import CompletedTask, TaskStore diff --git a/python-sdk/indexify/executor/downloader.py b/python-sdk/indexify/executor/downloader.py index 29961d566..696430280 100644 --- a/python-sdk/indexify/executor/downloader.py +++ b/python-sdk/indexify/executor/downloader.py @@ -2,7 +2,6 @@ import os from typing import Optional -import httpx from pydantic import BaseModel from rich.console import Console from rich.panel import Panel @@ -10,10 +9,9 @@ from indexify.functions_sdk.data_objects import IndexifyData -from ..common_util import get_httpx_client from ..functions_sdk.object_serializer import JsonSerializer, get_serializer from .api_objects import Task -from .downloadtask import DownloadTask +from .executor_tasks import DownloadTask custom_theme = Theme( { diff --git a/python-sdk/indexify/executor/downloadtask.py b/python-sdk/indexify/executor/downloadtask.py deleted file mode 100644 index 3875187b7..000000000 --- a/python-sdk/indexify/executor/downloadtask.py +++ /dev/null @@ -1,22 +0,0 @@ -import asyncio - -from indexify.executor.api_objects import Task - - -class DownloadTask(asyncio.Task): - def __init__( - self, - *, - task: Task, - coroutine, - name, - loop, - **kwargs, - ): - kwargs["name"] = name - kwargs["loop"] = loop - super().__init__( - coroutine, - **kwargs, - ) - self.task = task diff --git a/python-sdk/indexify/executor/executor_tasks.py b/python-sdk/indexify/executor/executor_tasks.py index 511dbca02..408932e5f 100644 --- a/python-sdk/indexify/executor/executor_tasks.py +++ b/python-sdk/indexify/executor/executor_tasks.py @@ -1,129 +1,39 @@ import asyncio -from typing import Optional - -from pydantic import BaseModel - -from indexify.executor.function_worker import FunctionWorker -from indexify.functions_sdk.data_objects import IndexifyData from .api_objects import Task -from .downloader import Downloader - - -class Job(BaseModel): - job_name: str - job_id: str - namespace: str - compute_graph: str - compute_fn: str - invocation_id: str - input_key: str - reducer_output_id: Optional[str] = None - graph_version: int - - -def convert_job_to_task(Job): - return Task( - job_id=Job.job_id, - namespace=Job.namespace, - compute_graph=Job.compute_graph, - compute_fn=Job.compute_fn, - invocation_id=Job.invocation_id, - input_key=Job.input_key, - reducer_output_id=Job.reducer_output_id, - graph_version=Job.graph_version, - ) - -class DownloadGraphTask(asyncio.Task): +class RunFunctionTask(asyncio.Task): def __init__( self, *, task: Task, - downloader: Downloader, + coroutine, + loop, **kwargs, ): - kwargs["name"] = "download_graph" - kwargs["loop"] = asyncio.get_event_loop() - super().__init__( - downloader.download_graph( - task.namespace, task.compute_graph, task.graph_version - ), - **kwargs, - ) - self.task = task - - -class DownloadInputTask(asyncio.Task): - def __init__( - self, - *, - task: Task, - downloader: Downloader, - **kwargs, - ): - kwargs["name"] = "download_input" - kwargs["loop"] = asyncio.get_event_loop() + kwargs["name"] = "run_function" + kwargs["loop"] = loop super().__init__( - downloader.download_input(task), + coroutine, **kwargs, ) self.task = task -class ExtractTask(asyncio.Task): +class DownloadTask(asyncio.Task): def __init__( self, *, - function_worker: FunctionWorker, task: Task, - input: IndexifyData, - init_value: Optional[IndexifyData] = None, - code_path: str, + coroutine, + name, + loop, **kwargs, ): - kwargs["name"] = "run_function" - kwargs["loop"] = asyncio.get_event_loop() + kwargs["name"] = name + kwargs["loop"] = loop super().__init__( - function_worker.async_submit( - namespace=task.namespace, - graph_name=task.compute_graph, - fn_name=task.compute_fn, - input=input, - init_value=init_value, - code_path=code_path, - version=task.graph_version, - invocation_id=task.invocation_id, - ), + coroutine, **kwargs, ) self.task = task - - -# class ExtractTask(asyncio.Future): -# def __init__( -# self, -# *, -# function_worker: FunctionWorker, -# task: Task, -# input: IndexifyData, -# init_value: Optional[IndexifyData] = None, -# code_path: str, -# **kwargs, -# ): -# kwargs["name"] = "run_function" -# kwargs["loop"] = asyncio.get_event_loop() -# super().__init__( -# function_worker.async_submit( -# namespace=task.namespace, -# graph_name=task.compute_graph, -# fn_name=task.compute_fn, -# input=input, -# init_value=init_value, -# code_path=code_path, -# version=task.graph_version, -# invocation_id=task.invocation_id, -# ), -# **kwargs, -# ) -# self.task = task diff --git a/python-sdk/indexify/executor/function_worker/function_worker.py b/python-sdk/indexify/executor/function_worker/function_worker.py index 4056037ef..ec573c808 100644 --- a/python-sdk/indexify/executor/function_worker/function_worker.py +++ b/python-sdk/indexify/executor/function_worker/function_worker.py @@ -1,14 +1,14 @@ import asyncio import sys import traceback -from typing import Dict, List, Optional +from typing import List, Optional -import cloudpickle from pydantic import BaseModel from rich import print from indexify import IndexifyClient -from indexify.executor.runtask import RunFunctionTask +from indexify.executor.function_worker.function_worker_utils import _load_function +from indexify.executor.executor_tasks import RunFunctionTask from indexify.functions_sdk.data_objects import ( FunctionWorkerOutput, IndexifyData, @@ -16,13 +16,9 @@ ) from indexify.functions_sdk.indexify_functions import ( FunctionCallResult, - GraphInvocationContext, - IndexifyFunctionWrapper, RouterCallResult, ) - -function_wrapper_map: Dict[str, IndexifyFunctionWrapper] = {} - +from indexify.executor.api_objects import Task class FunctionRunException(Exception): def __init__( @@ -43,37 +39,6 @@ class FunctionOutput(BaseModel): stdout: str = "" stderr: str = "" - -def _load_function( - namespace: str, - graph_name: str, - fn_name: str, - code_path: str, - version: int, - invocation_id: str, - indexify_client: IndexifyClient, -): - """Load an extractor to the memory: extractor_wrapper_map.""" - global function_wrapper_map - key = f"{namespace}/{graph_name}/{version}/{fn_name}" - if key in function_wrapper_map: - return - with open(code_path, "rb") as f: - code = f.read() - pickled_functions = cloudpickle.loads(code) - context = GraphInvocationContext( - invocation_id=invocation_id, - graph_name=graph_name, - graph_version=str(version), - indexify_client=indexify_client, - ) - function_wrapper = IndexifyFunctionWrapper( - cloudpickle.loads(pickled_functions[fn_name]), - context, - ) - function_wrapper_map[key] = function_wrapper - - class Job(BaseModel): namespace: str graph_name: str @@ -93,7 +58,7 @@ def __init__( self._indexify_client: IndexifyClient = indexify_client self._loop = asyncio.get_event_loop() - def run_function(self, task, fn_input, init_value, code_path): + def run_function(self, task: Task, fn_input: IndexifyData, init_value: IndexifyData | None, code_path: str): return RunFunctionTask( task=task, coroutine=self.async_submit( @@ -170,9 +135,7 @@ async def _run_function( ) with redirect_stdout(stdout_capture), redirect_stderr(stderr_capture): try: - key = f"{namespace}/{graph_name}/{version}/{fn_name}" - if key not in function_wrapper_map: - _load_function( + fn = _load_function( namespace, graph_name, fn_name, @@ -181,8 +144,6 @@ async def _run_function( invocation_id, indexify_client, ) - - fn = function_wrapper_map[key] if ( str(type(fn.indexify_function)) == "" diff --git a/python-sdk/indexify/executor/function_worker/function_worker_utils.py b/python-sdk/indexify/executor/function_worker/function_worker_utils.py index e244caeaf..f5923def9 100644 --- a/python-sdk/indexify/executor/function_worker/function_worker_utils.py +++ b/python-sdk/indexify/executor/function_worker/function_worker_utils.py @@ -1,18 +1,32 @@ import os +from functools import cache -import nanoid +import cloudpickle +from indexify import IndexifyClient +from indexify.functions_sdk.indexify_functions import GraphInvocationContext, IndexifyFunctionWrapper -def get_optimal_process_count(): - """ - Returns a reasonable number of processes based on CPU cores. - Generally CPU cores - 1 to leave one core for the OS/other tasks. - """ - return max(os.cpu_count() - 1, 1) - - -def job_generator() -> str: - """ - Generates job ID - """ - return nanoid.generate() +@cache +def _load_function( + namespace: str, + graph_name: str, + fn_name: str, + code_path: str, + version: int, + invocation_id: str, + indexify_client: IndexifyClient, +): + """Load an extractor to the memory: extractor_wrapper_map.""" + with open(code_path, "rb") as f: + code = f.read() + pickled_functions = cloudpickle.loads(code) + context = GraphInvocationContext( + invocation_id=invocation_id, + graph_name=graph_name, + graph_version=str(version), + indexify_client=indexify_client, + ) + return IndexifyFunctionWrapper( + cloudpickle.loads(pickled_functions[fn_name]), + context, + ) diff --git a/python-sdk/indexify/executor/runtask.py b/python-sdk/indexify/executor/runtask.py deleted file mode 100644 index 52c2cfd03..000000000 --- a/python-sdk/indexify/executor/runtask.py +++ /dev/null @@ -1,21 +0,0 @@ -import asyncio - -from indexify.executor.api_objects import Task - - -class RunFunctionTask(asyncio.Task): - def __init__( - self, - *, - task: Task, - coroutine, - loop, - **kwargs, - ): - kwargs["name"] = "run_function" - kwargs["loop"] = loop - super().__init__( - coroutine, - **kwargs, - ) - self.task = task From ed2a71b8b37c19e6cc488c3b6e381fab8999933c Mon Sep 17 00:00:00 2001 From: Default28 Date: Thu, 21 Nov 2024 15:39:43 +0530 Subject: [PATCH 20/25] Remove console.py --- python-sdk/indexify/cli.py | 13 ++++++++++++- python-sdk/indexify/console.py | 12 ------------ python-sdk/indexify/executor/task_reporter.py | 13 +++++++++++-- 3 files changed, 23 insertions(+), 15 deletions(-) delete mode 100644 python-sdk/indexify/console.py diff --git a/python-sdk/indexify/cli.py b/python-sdk/indexify/cli.py index 9f4de9e14..41b401e9b 100644 --- a/python-sdk/indexify/cli.py +++ b/python-sdk/indexify/cli.py @@ -10,10 +10,11 @@ import nanoid import typer +from rich.console import Console from rich.panel import Panel from rich.text import Text +from rich.theme import Theme -from indexify.console import console from indexify.executor.agent import ExtractorAgent from indexify.functions_sdk.image import ( DEFAULT_IMAGE_3_10, @@ -21,6 +22,16 @@ Image, ) +custom_theme = Theme( + { + "info": "cyan", + "warning": "yellow", + "error": "red", + "highlight": "magenta", + } +) +console = Console(theme=custom_theme) + app = typer.Typer(pretty_exceptions_enable=False, no_args_is_help=True) diff --git a/python-sdk/indexify/console.py b/python-sdk/indexify/console.py deleted file mode 100644 index d3ce32f19..000000000 --- a/python-sdk/indexify/console.py +++ /dev/null @@ -1,12 +0,0 @@ -from rich.console import Console -from rich.theme import Theme - -custom_theme = Theme( - { - "info": "cyan", - "warning": "yellow", - "error": "red", - "highlight": "magenta", - } -) -console = Console(theme=custom_theme) diff --git a/python-sdk/indexify/executor/task_reporter.py b/python-sdk/indexify/executor/task_reporter.py index e31d6235d..456a57d1e 100644 --- a/python-sdk/indexify/executor/task_reporter.py +++ b/python-sdk/indexify/executor/task_reporter.py @@ -7,9 +7,10 @@ from rich import print from rich.panel import Panel from rich.text import Text +from rich.console import Console +from rich.theme import Theme from indexify.common_util import get_httpx_client -from indexify.console import console from indexify.executor.api_objects import RouterOutput as ApiRouterOutput from indexify.executor.api_objects import TaskResult from indexify.executor.task_store import CompletedTask, TaskStore @@ -24,7 +25,15 @@ def __bool__(self): FORCE_MULTIPART = ForceMultipartDict() UTF_8_CONTENT_TYPE = "application/octet-stream" - +custom_theme = Theme( + { + "info": "cyan", + "warning": "yellow", + "error": "red", + "highlight": "magenta", + } +) +console = Console(theme=custom_theme) class ReportingData(BaseModel): output_count: int = 0 From 18fc645804ce0ef55f0cde9b8c5cce8bd6d7b3e1 Mon Sep 17 00:00:00 2001 From: Default28 Date: Thu, 21 Nov 2024 15:48:22 +0530 Subject: [PATCH 21/25] Refactored task reported --- python-sdk/indexify/executor/agent.py | 2 +- .../executor/task_reporter/__init__.py | 3 ++ .../{ => task_reporter}/task_reporter.py | 37 +------------------ .../task_reporter/task_reporter_utils.py | 36 ++++++++++++++++++ 4 files changed, 42 insertions(+), 36 deletions(-) create mode 100644 python-sdk/indexify/executor/task_reporter/__init__.py rename python-sdk/indexify/executor/{ => task_reporter}/task_reporter.py (85%) create mode 100644 python-sdk/indexify/executor/task_reporter/task_reporter_utils.py diff --git a/python-sdk/indexify/executor/agent.py b/python-sdk/indexify/executor/agent.py index 462604af9..d4d3c2c8c 100644 --- a/python-sdk/indexify/executor/agent.py +++ b/python-sdk/indexify/executor/agent.py @@ -28,7 +28,7 @@ from .api_objects import ExecutorMetadata, Task from .downloader import DownloadedInputs, Downloader from .runtime_probes import ProbeInfo, RuntimeProbes -from .task_reporter import TaskReporter +from indexify.executor.task_reporter import TaskReporter from .task_store import CompletedTask, TaskStore custom_theme = Theme( diff --git a/python-sdk/indexify/executor/task_reporter/__init__.py b/python-sdk/indexify/executor/task_reporter/__init__.py new file mode 100644 index 000000000..c2f246faa --- /dev/null +++ b/python-sdk/indexify/executor/task_reporter/__init__.py @@ -0,0 +1,3 @@ +from .task_reporter import TaskReporter + +__all__ = ["TaskReporter"] diff --git a/python-sdk/indexify/executor/task_reporter.py b/python-sdk/indexify/executor/task_reporter/task_reporter.py similarity index 85% rename from python-sdk/indexify/executor/task_reporter.py rename to python-sdk/indexify/executor/task_reporter/task_reporter.py index 456a57d1e..183e34df0 100644 --- a/python-sdk/indexify/executor/task_reporter.py +++ b/python-sdk/indexify/executor/task_reporter/task_reporter.py @@ -5,7 +5,6 @@ from httpx import Timeout from pydantic import BaseModel from rich import print -from rich.panel import Panel from rich.text import Text from rich.console import Console from rich.theme import Theme @@ -13,6 +12,8 @@ from indexify.common_util import get_httpx_client from indexify.executor.api_objects import RouterOutput as ApiRouterOutput from indexify.executor.api_objects import TaskResult +from indexify.executor.task_reporter.task_reporter_utils import _log_exception, \ + _log from indexify.executor.task_store import CompletedTask, TaskStore from indexify.functions_sdk.object_serializer import get_serializer @@ -44,39 +45,6 @@ class ReportingData(BaseModel): stderr_total_bytes: int = 0 -def _log_exception(task_outcome, e): - console.print( - Panel( - f"Failed to report task {task_outcome.task.id}\n" - f"Exception: {type(e).__name__}({e})\n" - f"Retries: {task_outcome.reporting_retries}\n" - "Retrying...", - title="Reporting Error", - border_style="error", - ) - ) - - -def _log(task_outcome): - outcome = task_outcome.task_outcome - style_outcome = ( - f"[bold red] {outcome} [/]" - if "fail" in outcome - else f"[bold green] {outcome} [/]" - ) - console.print( - Panel( - f"Reporting outcome of task: {task_outcome.task.id}, function: {task_outcome.task.compute_fn}\n" - f"Outcome: {style_outcome}\n" - f"Num Fn Outputs: {len(task_outcome.outputs or [])}\n" - f"Router Output: {task_outcome.router_output}\n" - f"Retries: {task_outcome.reporting_retries}", - title="Task Completion", - border_style="info", - ) - ) - - class TaskReporter: def __init__( self, @@ -111,7 +79,6 @@ async def run(self): self._task_store.mark_reported(task_id=task_outcome.task.id) def report_task_outcome(self, completed_task: CompletedTask): - report = ReportingData() fn_outputs = [] for output in completed_task.outputs or []: diff --git a/python-sdk/indexify/executor/task_reporter/task_reporter_utils.py b/python-sdk/indexify/executor/task_reporter/task_reporter_utils.py new file mode 100644 index 000000000..d97e9bd5b --- /dev/null +++ b/python-sdk/indexify/executor/task_reporter/task_reporter_utils.py @@ -0,0 +1,36 @@ +from rich.panel import Panel + +from indexify.executor.task_reporter.task_reporter import console + + +def _log_exception(task_outcome, e): + console.print( + Panel( + f"Failed to report task {task_outcome.task.id}\n" + f"Exception: {type(e).__name__}({e})\n" + f"Retries: {task_outcome.reporting_retries}\n" + "Retrying...", + title="Reporting Error", + border_style="error", + ) + ) + + +def _log(task_outcome): + outcome = task_outcome.task_outcome + style_outcome = ( + f"[bold red] {outcome} [/]" + if "fail" in outcome + else f"[bold green] {outcome} [/]" + ) + console.print( + Panel( + f"Reporting outcome of task: {task_outcome.task.id}, function: {task_outcome.task.compute_fn}\n" + f"Outcome: {style_outcome}\n" + f"Num Fn Outputs: {len(task_outcome.outputs or [])}\n" + f"Router Output: {task_outcome.router_output}\n" + f"Retries: {task_outcome.reporting_retries}", + title="Task Completion", + border_style="info", + ) + ) From 130a9c2e2e89bd336f75e0bad33ca9082be03808 Mon Sep 17 00:00:00 2001 From: Default28 Date: Thu, 21 Nov 2024 15:51:03 +0530 Subject: [PATCH 22/25] linting --- python-sdk/indexify/executor/agent.py | 4 +-- python-sdk/indexify/executor/api_objects.py | 1 + python-sdk/indexify/executor/downloader.py | 13 ++++++-- .../indexify/executor/executor_tasks.py | 1 + .../function_worker/function_worker.py | 32 ++++++++++++------- .../function_worker/function_worker_utils.py | 6 +++- .../executor/task_reporter/task_reporter.py | 17 +++------- .../task_reporter/task_reporter_utils.py | 15 +++++++-- python-sdk/indexify/http_client.py | 21 +++++++++--- 9 files changed, 75 insertions(+), 35 deletions(-) diff --git a/python-sdk/indexify/executor/agent.py b/python-sdk/indexify/executor/agent.py index d4d3c2c8c..be3caab68 100644 --- a/python-sdk/indexify/executor/agent.py +++ b/python-sdk/indexify/executor/agent.py @@ -16,6 +16,7 @@ from indexify.common_util import get_httpx_client from indexify.executor.function_worker import FunctionWorker +from indexify.executor.task_reporter import TaskReporter from indexify.functions_sdk.data_objects import ( FunctionWorkerOutput, IndexifyData, @@ -28,7 +29,6 @@ from .api_objects import ExecutorMetadata, Task from .downloader import DownloadedInputs, Downloader from .runtime_probes import ProbeInfo, RuntimeProbes -from indexify.executor.task_reporter import TaskReporter from .task_store import CompletedTask, TaskStore custom_theme = Theme( @@ -107,7 +107,7 @@ def __init__( config_path=self._config_path, ), code_path=code_path, - base_url=self._base_url + base_url=self._base_url, ) self._task_reporter = TaskReporter( base_url=self._base_url, diff --git a/python-sdk/indexify/executor/api_objects.py b/python-sdk/indexify/executor/api_objects.py index 281d5b3ba..0198e7f83 100644 --- a/python-sdk/indexify/executor/api_objects.py +++ b/python-sdk/indexify/executor/api_objects.py @@ -28,6 +28,7 @@ class ExecutorMetadata(BaseModel): class RouterOutput(BaseModel): edges: List[str] + class TaskResult(BaseModel): router_output: Optional[RouterOutput] = None outcome: str diff --git a/python-sdk/indexify/executor/downloader.py b/python-sdk/indexify/executor/downloader.py index 696430280..e42b44379 100644 --- a/python-sdk/indexify/executor/downloader.py +++ b/python-sdk/indexify/executor/downloader.py @@ -9,6 +9,7 @@ from indexify.functions_sdk.data_objects import IndexifyData +from .. import IndexifyClient from ..functions_sdk.object_serializer import JsonSerializer, get_serializer from .api_objects import Task from .executor_tasks import DownloadTask @@ -95,13 +96,21 @@ async def download_input(self, task: Task) -> DownloadedInputs: input_id = task.input_key.split("|")[-1] if task.invocation_id == input_id: - response = self._client.download_fn_input(task.namespace, task.compute_graph, task.invocation_id) + response = self._client.download_fn_input( + task.namespace, task.compute_graph, task.invocation_id + ) else: response = self._client.download_fn_output(task.input_key) init_value = None if task.reducer_output_id: - init_value = self._client.download_reducer_input(task.namespace, task.compute_graph, task.invocation_id, task.compute_fn, task.reducer_output_id) + init_value = self._client.download_reducer_input( + task.namespace, + task.compute_graph, + task.invocation_id, + task.compute_fn, + task.reducer_output_id, + ) encoder = ( "json" diff --git a/python-sdk/indexify/executor/executor_tasks.py b/python-sdk/indexify/executor/executor_tasks.py index 408932e5f..ca3055745 100644 --- a/python-sdk/indexify/executor/executor_tasks.py +++ b/python-sdk/indexify/executor/executor_tasks.py @@ -2,6 +2,7 @@ from .api_objects import Task + class RunFunctionTask(asyncio.Task): def __init__( self, diff --git a/python-sdk/indexify/executor/function_worker/function_worker.py b/python-sdk/indexify/executor/function_worker/function_worker.py index ec573c808..d12b6e885 100644 --- a/python-sdk/indexify/executor/function_worker/function_worker.py +++ b/python-sdk/indexify/executor/function_worker/function_worker.py @@ -7,8 +7,11 @@ from rich import print from indexify import IndexifyClient -from indexify.executor.function_worker.function_worker_utils import _load_function +from indexify.executor.api_objects import Task from indexify.executor.executor_tasks import RunFunctionTask +from indexify.executor.function_worker.function_worker_utils import ( + _load_function, +) from indexify.functions_sdk.data_objects import ( FunctionWorkerOutput, IndexifyData, @@ -18,7 +21,7 @@ FunctionCallResult, RouterCallResult, ) -from indexify.executor.api_objects import Task + class FunctionRunException(Exception): def __init__( @@ -39,6 +42,7 @@ class FunctionOutput(BaseModel): stdout: str = "" stderr: str = "" + class Job(BaseModel): namespace: str graph_name: str @@ -58,7 +62,13 @@ def __init__( self._indexify_client: IndexifyClient = indexify_client self._loop = asyncio.get_event_loop() - def run_function(self, task: Task, fn_input: IndexifyData, init_value: IndexifyData | None, code_path: str): + def run_function( + self, + task: Task, + fn_input: IndexifyData, + init_value: IndexifyData | None, + code_path: str, + ): return RunFunctionTask( task=task, coroutine=self.async_submit( @@ -136,14 +146,14 @@ async def _run_function( with redirect_stdout(stdout_capture), redirect_stderr(stderr_capture): try: fn = _load_function( - namespace, - graph_name, - fn_name, - code_path, - version, - invocation_id, - indexify_client, - ) + namespace, + graph_name, + fn_name, + code_path, + version, + invocation_id, + indexify_client, + ) if ( str(type(fn.indexify_function)) == "" diff --git a/python-sdk/indexify/executor/function_worker/function_worker_utils.py b/python-sdk/indexify/executor/function_worker/function_worker_utils.py index f5923def9..8cfca4b43 100644 --- a/python-sdk/indexify/executor/function_worker/function_worker_utils.py +++ b/python-sdk/indexify/executor/function_worker/function_worker_utils.py @@ -4,7 +4,11 @@ import cloudpickle from indexify import IndexifyClient -from indexify.functions_sdk.indexify_functions import GraphInvocationContext, IndexifyFunctionWrapper +from indexify.functions_sdk.indexify_functions import ( + GraphInvocationContext, + IndexifyFunctionWrapper, +) + @cache def _load_function( diff --git a/python-sdk/indexify/executor/task_reporter/task_reporter.py b/python-sdk/indexify/executor/task_reporter/task_reporter.py index 183e34df0..06bb587a2 100644 --- a/python-sdk/indexify/executor/task_reporter/task_reporter.py +++ b/python-sdk/indexify/executor/task_reporter/task_reporter.py @@ -6,14 +6,14 @@ from pydantic import BaseModel from rich import print from rich.text import Text -from rich.console import Console -from rich.theme import Theme from indexify.common_util import get_httpx_client from indexify.executor.api_objects import RouterOutput as ApiRouterOutput from indexify.executor.api_objects import TaskResult -from indexify.executor.task_reporter.task_reporter_utils import _log_exception, \ - _log +from indexify.executor.task_reporter.task_reporter_utils import ( + _log, + _log_exception, console, +) from indexify.executor.task_store import CompletedTask, TaskStore from indexify.functions_sdk.object_serializer import get_serializer @@ -26,15 +26,6 @@ def __bool__(self): FORCE_MULTIPART = ForceMultipartDict() UTF_8_CONTENT_TYPE = "application/octet-stream" -custom_theme = Theme( - { - "info": "cyan", - "warning": "yellow", - "error": "red", - "highlight": "magenta", - } -) -console = Console(theme=custom_theme) class ReportingData(BaseModel): output_count: int = 0 diff --git a/python-sdk/indexify/executor/task_reporter/task_reporter_utils.py b/python-sdk/indexify/executor/task_reporter/task_reporter_utils.py index d97e9bd5b..dddb8ce3f 100644 --- a/python-sdk/indexify/executor/task_reporter/task_reporter_utils.py +++ b/python-sdk/indexify/executor/task_reporter/task_reporter_utils.py @@ -1,6 +1,6 @@ +from rich.console import Console from rich.panel import Panel - -from indexify.executor.task_reporter.task_reporter import console +from rich.theme import Theme def _log_exception(task_outcome, e): @@ -34,3 +34,14 @@ def _log(task_outcome): border_style="info", ) ) + + +custom_theme = Theme( + { + "info": "cyan", + "warning": "yellow", + "error": "red", + "highlight": "magenta", + } +) +console = Console(theme=custom_theme) diff --git a/python-sdk/indexify/http_client.py b/python-sdk/indexify/http_client.py index 177768b21..467357c41 100644 --- a/python-sdk/indexify/http_client.py +++ b/python-sdk/indexify/http_client.py @@ -165,16 +165,29 @@ def __exit__(self, exc_type, exc_value, traceback): self.close() def download_graph(self, namespace: str, compute_graph: str): - return self._get(f"internal/namespaces/{namespace}/compute_graphs/{compute_graph}/code") + return self._get( + f"internal/namespaces/{namespace}/compute_graphs/{compute_graph}/code" + ) def download_fn_input(self, namespace: str, compute_graph: str, invocation_id: str): - return self._get(f"namespaces/{namespace}/compute_graphs/{compute_graph}/invocations/{invocation_id}/payload") + return self._get( + f"namespaces/{namespace}/compute_graphs/{compute_graph}/invocations/{invocation_id}/payload" + ) def download_fn_output(self, input_key: str): return self._get(f"internal/fn_outputs/{input_key}") - def download_reducer_input(self, namespace: str, compute_graph: str, invocation_id: str, compute_fn: str, reducer_output_id: str): - return self._get(f"namespaces/{namespace}/compute_graphs/{compute_graph}/invocations/{invocation_id}/fn/{compute_fn}/output/{reducer_output_id}") + def download_reducer_input( + self, + namespace: str, + compute_graph: str, + invocation_id: str, + compute_fn: str, + reducer_output_id: str, + ): + return self._get( + f"namespaces/{namespace}/compute_graphs/{compute_graph}/invocations/{invocation_id}/fn/{compute_fn}/output/{reducer_output_id}" + ) def register_compute_graph(self, graph: Graph, additional_modules): graph_metadata = graph.definition() From a87bd5237f466f9b892cd6084251b72a9ba26c4c Mon Sep 17 00:00:00 2001 From: Default28 Date: Thu, 21 Nov 2024 17:52:03 +0530 Subject: [PATCH 23/25] Refactoring agent.py --- python-sdk/indexify/executor/agent.py | 282 +++++++++--------- python-sdk/indexify/executor/downloader.py | 25 +- .../indexify/executor/executor_tasks.py | 33 +- .../function_worker/function_worker.py | 4 - .../executor/task_reporter/task_reporter.py | 6 +- 5 files changed, 180 insertions(+), 170 deletions(-) diff --git a/python-sdk/indexify/executor/agent.py b/python-sdk/indexify/executor/agent.py index be3caab68..65d45e3d5 100644 --- a/python-sdk/indexify/executor/agent.py +++ b/python-sdk/indexify/executor/agent.py @@ -28,6 +28,7 @@ from . import image_dependency_installer from .api_objects import ExecutorMetadata, Task from .downloader import DownloadedInputs, Downloader +from .executor_tasks import DownloadTask, RunFunctionTask, TaskEnum from .runtime_probes import ProbeInfo, RuntimeProbes from .task_store import CompletedTask, TaskStore @@ -64,7 +65,8 @@ def __init__( image_version: Optional[int] = None, ): event_loop = asyncio.get_event_loop() - event_loop.set_default_executor(ThreadPoolExecutor(max_workers=num_workers)) + self._thread_pool = ThreadPoolExecutor(max_workers=num_workers) + event_loop.set_default_executor(self._thread_pool) self.name_alias = name_alias self.image_version = image_version self._config_path = config_path @@ -117,12 +119,13 @@ def __init__( ) async def task_launcher(self): - async_tasks: List[asyncio.Task | asyncio.Future] = [] + async_tasks: List[asyncio.Task] = [] fn_queue: List[FunctionInput] = [] async_tasks.append( asyncio.create_task( - self._task_store.get_runnable_tasks(), name="get_runnable_tasks" + self._task_store.get_runnable_tasks(), + name=TaskEnum.GET_RUNNABLE_TASK.value, ) ) @@ -132,40 +135,12 @@ async def task_launcher(self): task: Task = self._task_store.get_task(fn.task_id) if self._executor_bootstrap_failed: - completed_task = CompletedTask( - task=task, - outputs=[], - task_outcome="failure", - ) - self._task_store.complete(outcome=completed_task) - + self.mark_task_as_failed(task) continue # Bootstrap this executor. Fail the task if we can't. if self._require_image_bootstrap: - try: - image_info = await _get_image_info_for_compute_graph( - task, self._protocol, self._server_addr, self._config_path - ) - image_dependency_installer.executor_image_builder( - image_info, self.name_alias, self.image_version - ) - self._require_image_bootstrap = False - except Exception as e: - console.print( - Text("Failed to bootstrap the executor ", style="red bold") - + Text(f"Exception: {traceback.format_exc()}", style="red") - ) - - self._executor_bootstrap_failed = True - - completed_task = CompletedTask( - task=task, - outputs=[], - task_outcome="failure", - ) - self._task_store.complete(outcome=completed_task) - + if not self._try_bootstrap(task): continue code_path = f"{self._code_path}/{task.namespace}/{task.compute_graph}.{task.graph_version}" @@ -173,13 +148,6 @@ async def task_launcher(self): self._function_worker.run_function( task, fn.input, fn.init_value, code_path ) - # ExtractTask( - # function_worker=self._function_worker, - # task=task, - # input=fn.input, - # code_path=f"{self._code_path}/{task.namespace}/{task.compute_graph}.{task.graph_version}", - # init_value=fn.init_value, - # ) ) fn_queue = [] @@ -189,121 +157,139 @@ async def task_launcher(self): async_tasks: List[asyncio.Task] = list(pending) for async_task in done: - if async_task.get_name() == "get_runnable_tasks": - if async_task.exception(): - console.print( - Text("Task Launcher Error: ", style="red bold") - + Text( - f"Failed to get runnable tasks: {async_task.exception()}", - style="red", + task_name = TaskEnum.from_value(async_task.get_name()) + match task_name: + case TaskEnum.GET_RUNNABLE_TASK: + if async_task.exception(): + self._console_log_exception(async_task) + continue + result: Dict[str, Task] = await async_task + task: Task + for _, task in result.items(): + async_tasks.append( + self._downloader.download( + task, TaskEnum.DOWNLOAD_GRAPH_TASK + ) ) - ) - continue - result: Dict[str, Task] = await async_task - task: Task - for _, task in result.items(): async_tasks.append( - self._downloader.download(task, "download_graph") - ) - async_tasks.append( - asyncio.create_task( - self._task_store.get_runnable_tasks(), - name="get_runnable_tasks", - ) - ) - elif async_task.get_name() == "download_graph": - if async_task.exception(): - console.print( - Text( - f"Failed to download graph for task {async_task.task.id}\n", - style="red bold", + asyncio.create_task( + self._task_store.get_runnable_tasks(), + name=TaskEnum.GET_RUNNABLE_TASK.value, ) - + Text(f"Exception: {async_task.exception()}", style="red") - ) - completed_task = CompletedTask( - task=async_task.task, - outputs=[], - task_outcome="failure", ) - self._task_store.complete(outcome=completed_task) - continue - async_tasks.append( - self._downloader.download(async_task.task, "download_input") - ) - elif async_task.get_name() == "download_input": - if async_task.exception(): - console.print( - Text( - f"Failed to download input for task {async_task.task.id}\n", - style="red bold", + case TaskEnum.DOWNLOAD_GRAPH_TASK: + async_task: DownloadTask + if async_task.exception(): + self._console_log_exception(async_task) + self.mark_task_as_failed(async_task.task) + continue + async_tasks.append( + self._downloader.download( + async_task.task, TaskEnum.DOWNLOAD_INPUT_TASK ) - + Text(f"Exception: {async_task.exception()}", style="red") - ) - completed_task = CompletedTask( - task=async_task.task, - outputs=[], - task_outcome="failure", - ) - self._task_store.complete(outcome=completed_task) - continue - downloaded_inputs: DownloadedInputs = await async_task - task: Task = async_task.task - fn_queue.append( - FunctionInput( - task_id=task.id, - namespace=task.namespace, - compute_graph=task.compute_graph, - function=task.compute_fn, - input=downloaded_inputs.input, - init_value=downloaded_inputs.init_value, ) - ) - elif async_task.get_name() == "run_function": - if async_task.exception(): - completed_task = CompletedTask( - task=async_task.task, - task_outcome="failure", - outputs=[], - stderr=str(async_task.exception()), - ) - self._task_store.complete(outcome=completed_task) - continue - async_task: ExtractTask - try: - outputs: FunctionWorkerOutput = await async_task - if not outputs.success: - task_outcome = "failure" - else: - task_outcome = "success" - - completed_task = CompletedTask( - task=async_task.task, - task_outcome=task_outcome, - outputs=outputs.fn_outputs, - router_output=outputs.router_output, - stdout=outputs.stdout, - stderr=outputs.stderr, - reducer=outputs.reducer, - ) - self._task_store.complete(outcome=completed_task) - except BrokenProcessPool: - self._task_store.retriable_failure(async_task.task.id) - continue - except Exception as e: - console.print( - Text( - f"Failed to execute task {async_task.task.id}\n", - style="red bold", + case TaskEnum.DOWNLOAD_INPUT_TASK: + async_task: DownloadTask + if async_task.exception(): + self._console_log_exception(async_task) + self.mark_task_as_failed(async_task.task) + continue + downloaded_inputs: DownloadedInputs = await async_task + task: Task = async_task.task + fn_queue.append( + FunctionInput( + task_id=task.id, + namespace=task.namespace, + compute_graph=task.compute_graph, + function=task.compute_fn, + input=downloaded_inputs.input, + init_value=downloaded_inputs.init_value, ) - + Text(f"Exception: {e}", style="red") ) - completed_task = CompletedTask( - task=async_task.task, - task_outcome="failure", - outputs=[], + case TaskEnum.RUN_FUNCTION_TASK: + async_task: RunFunctionTask + if async_task.exception(): + self.mark_task_as_failed( + async_task.task, str(async_task.exception()) + ) + continue + try: + outputs: FunctionWorkerOutput = await async_task + if not outputs.success: + task_outcome = "failure" + else: + task_outcome = "success" + + completed_task = CompletedTask( + task=async_task.task, + task_outcome=task_outcome, + outputs=outputs.fn_outputs, + router_output=outputs.router_output, + stdout=outputs.stdout, + stderr=outputs.stderr, + reducer=outputs.reducer, + ) + self._task_store.complete(outcome=completed_task) + except BrokenProcessPool: + self._task_store.retriable_failure(async_task.task.id) + continue + except Exception as e: + console.print( + Text( + f"Failed to execute task {async_task.task.id}\n", + style="red bold", + ) + + Text(f"Exception: {e}", style="red") + ) + completed_task = CompletedTask( + task=async_task.task, + task_outcome="failure", + outputs=[], + ) + self._task_store.complete(outcome=completed_task) + continue + case _: + raise ValueError( + f"'{async_task.get_name()}' is not a valid task name." ) - self._task_store.complete(outcome=completed_task) - continue + + def _console_log_exception(self, async_task: asyncio.Task): + console.print( + Text("Task Launcher Error: ", style="red bold") + + Text( + f"Failed to get runnable tasks: {async_task.exception()}", + style="red", + ) + ) + + def mark_task_as_failed(self, task: Task, stderr: str = None): + completed_task = CompletedTask( + task=task, + outputs=[], + task_outcome="failure", + stderr=stderr, + ) + self._task_store.complete(outcome=completed_task) + + def _try_bootstrap(self, task: Task) -> bool: + try: + image_info = _get_image_info_for_compute_graph( + task, self._protocol, self._server_addr, self._config_path + ) + image_dependency_installer.executor_image_builder( + image_info, self.name_alias, self.image_version + ) + self._require_image_bootstrap = False + return True + except Exception as e: + console.print( + Text("Failed to bootstrap the executor ", style="red bold") + + Text(f"Exception: {traceback.format_exc()}", style="red") + ) + + self._executor_bootstrap_failed = True + self.mark_task_as_failed(task) + return False async def run(self): console.print("Starting Extractor Agent...", style="green") @@ -395,15 +381,15 @@ def to_sentence_case(snake_str): async def _shutdown(self, loop): console.print(Text("shutting down agent...", style="bold yellow")) self._should_run = False + self._thread_pool.shutdown(cancel_futures=True) for task in asyncio.all_tasks(loop): task.cancel() def shutdown(self, loop): - self._function_worker.shutdown() loop.create_task(self._shutdown(loop)) -async def _get_image_info_for_compute_graph( +def _get_image_info_for_compute_graph( task: Task, protocol, server_addr, config_path: str ) -> ImageInformation: namespace = task.namespace diff --git a/python-sdk/indexify/executor/downloader.py b/python-sdk/indexify/executor/downloader.py index e42b44379..91a746e2f 100644 --- a/python-sdk/indexify/executor/downloader.py +++ b/python-sdk/indexify/executor/downloader.py @@ -12,7 +12,7 @@ from .. import IndexifyClient from ..functions_sdk.object_serializer import JsonSerializer, get_serializer from .api_objects import Task -from .executor_tasks import DownloadTask +from .executor_tasks import DownloadTask, TaskEnum custom_theme = Theme( { @@ -39,18 +39,18 @@ def __init__( ): self.code_path = code_path self.base_url = base_url - self._client = indexify_client + self._indexify_client = indexify_client self._event_loop = asyncio.get_event_loop() def download(self, task, name): - if name == "download_graph": + if name == TaskEnum.DOWNLOAD_GRAPH_TASK: coroutine = self.download_graph( task.namespace, task.compute_graph, task.graph_version ) - elif name == "download_input": + elif name == TaskEnum.DOWNLOAD_INPUT_TASK: coroutine = self.download_input(task) else: - raise Exception("Unsupported task name") + raise ValueError(f"Unsupported task name: {name}") return DownloadTask( task=task, coroutine=coroutine, name=name, loop=self._event_loop @@ -69,10 +69,17 @@ async def download_graph(self, namespace: str, name: str, version: int) -> str: ) ) - response = self._client.download_graph(namespace, name) + response = self._indexify_client.download_graph(namespace, name) os.makedirs(os.path.dirname(path), exist_ok=True) with open(path, "wb") as f: f.write(response.content) + console.print( + Panel( + f"Graph Downloaded", + title="downloader", + border_style="cyan", + ) + ) return path async def download_input(self, task: Task) -> DownloadedInputs: @@ -96,15 +103,15 @@ async def download_input(self, task: Task) -> DownloadedInputs: input_id = task.input_key.split("|")[-1] if task.invocation_id == input_id: - response = self._client.download_fn_input( + response = self._indexify_client.download_fn_input( task.namespace, task.compute_graph, task.invocation_id ) else: - response = self._client.download_fn_output(task.input_key) + response = self._indexify_client.download_fn_output(task.input_key) init_value = None if task.reducer_output_id: - init_value = self._client.download_reducer_input( + init_value = self._indexify_client.download_reducer_input( task.namespace, task.compute_graph, task.invocation_id, diff --git a/python-sdk/indexify/executor/executor_tasks.py b/python-sdk/indexify/executor/executor_tasks.py index ca3055745..928aef977 100644 --- a/python-sdk/indexify/executor/executor_tasks.py +++ b/python-sdk/indexify/executor/executor_tasks.py @@ -1,4 +1,6 @@ import asyncio +from enum import Enum, unique +from typing import Coroutine from .api_objects import Task @@ -8,11 +10,11 @@ def __init__( self, *, task: Task, - coroutine, - loop, + coroutine: Coroutine, + loop: asyncio.AbstractEventLoop, **kwargs, ): - kwargs["name"] = "run_function" + kwargs["name"] = TaskEnum.RUN_FUNCTION_TASK.value kwargs["loop"] = loop super().__init__( coroutine, @@ -26,15 +28,32 @@ def __init__( self, *, task: Task, - coroutine, - name, - loop, + coroutine: Coroutine, + name: str, + loop: asyncio.AbstractEventLoop, **kwargs, ): - kwargs["name"] = name + if not isinstance(name, TaskEnum): + raise ValueError(f"name '{name}' must be TaskEnum") + kwargs["name"] = name.value kwargs["loop"] = loop super().__init__( coroutine, **kwargs, ) self.task = task + + +@unique +class TaskEnum(Enum): + GET_RUNNABLE_TASK = "get_runnable_tasks" + RUN_FUNCTION_TASK = "run_function" + DOWNLOAD_GRAPH_TASK = "download_graph" + DOWNLOAD_INPUT_TASK = "download_input" + + @classmethod + def from_value(cls, value): + for task_name in cls: + if task_name.value == value: + return task_name + raise ValueError(f"No task found with value {value}") diff --git a/python-sdk/indexify/executor/function_worker/function_worker.py b/python-sdk/indexify/executor/function_worker/function_worker.py index d12b6e885..b5df9d277 100644 --- a/python-sdk/indexify/executor/function_worker/function_worker.py +++ b/python-sdk/indexify/executor/function_worker/function_worker.py @@ -115,10 +115,6 @@ async def async_submit(self, **kwargs) -> FunctionWorkerOutput: success=False, ) - def shutdown(self): - # kill everything - self._loop.stop() - async def _run_function( namespace: str, diff --git a/python-sdk/indexify/executor/task_reporter/task_reporter.py b/python-sdk/indexify/executor/task_reporter/task_reporter.py index 06bb587a2..1b786272a 100644 --- a/python-sdk/indexify/executor/task_reporter/task_reporter.py +++ b/python-sdk/indexify/executor/task_reporter/task_reporter.py @@ -12,7 +12,8 @@ from indexify.executor.api_objects import TaskResult from indexify.executor.task_reporter.task_reporter_utils import ( _log, - _log_exception, console, + _log_exception, + console, ) from indexify.executor.task_store import CompletedTask, TaskStore from indexify.functions_sdk.object_serializer import get_serializer @@ -27,6 +28,7 @@ def __bool__(self): FORCE_MULTIPART = ForceMultipartDict() UTF_8_CONTENT_TYPE = "application/octet-stream" + class ReportingData(BaseModel): output_count: int = 0 output_total_bytes: int = 0 @@ -50,7 +52,7 @@ def __init__( self._task_store = task_store async def run(self): - console.print(Text("Starting task completion reporter", style="bold cyan")) + console.print(Text("Starting TaskReporter", style="bold cyan")) # We should copy only the keys and not the values while True: From 624303356a610688e486602f78f7e11c95671b71 Mon Sep 17 00:00:00 2001 From: Default28 Date: Wed, 27 Nov 2024 16:56:33 +0530 Subject: [PATCH 24/25] Removing unused objects and fixing log_exception method --- python-sdk/indexify/executor/agent.py | 57 +++++++++---------- python-sdk/indexify/executor/downloader.py | 7 --- .../function_worker/function_worker.py | 12 ---- 3 files changed, 28 insertions(+), 48 deletions(-) diff --git a/python-sdk/indexify/executor/agent.py b/python-sdk/indexify/executor/agent.py index 65d45e3d5..66635de8a 100644 --- a/python-sdk/indexify/executor/agent.py +++ b/python-sdk/indexify/executor/agent.py @@ -94,7 +94,6 @@ def __init__( self._base_url = f"{self._protocol}://{self._server_addr}" self._task_store: TaskStore = TaskStore() self._executor_id = executor_id - console.print("Starting FunctionWorker", style="cyan bold") self._function_worker = FunctionWorker( indexify_client=IndexifyClient( service_url=self._base_url, @@ -135,7 +134,7 @@ async def task_launcher(self): task: Task = self._task_store.get_task(fn.task_id) if self._executor_bootstrap_failed: - self.mark_task_as_failed(task) + self._mark_task_as_failed(task) continue # Bootstrap this executor. Fail the task if we can't. @@ -161,7 +160,10 @@ async def task_launcher(self): match task_name: case TaskEnum.GET_RUNNABLE_TASK: if async_task.exception(): - self._console_log_exception(async_task) + self._console_log_exception( + "Task Launcher Error:", + f"Failed to get runnable tasks: {async_task.exception()}" + ) continue result: Dict[str, Task] = await async_task task: Task @@ -180,8 +182,11 @@ async def task_launcher(self): case TaskEnum.DOWNLOAD_GRAPH_TASK: async_task: DownloadTask if async_task.exception(): - self._console_log_exception(async_task) - self.mark_task_as_failed(async_task.task) + self._console_log_exception( + f"Failed to download graph for task {async_task.task.id}\n", + f"Exception: {async_task.exception()}" + ) + self._mark_task_as_failed(async_task.task) continue async_tasks.append( self._downloader.download( @@ -191,8 +196,11 @@ async def task_launcher(self): case TaskEnum.DOWNLOAD_INPUT_TASK: async_task: DownloadTask if async_task.exception(): - self._console_log_exception(async_task) - self.mark_task_as_failed(async_task.task) + self._console_log_exception( + f"Failed to download input for task {async_task.task.id}\n", + f"Exception: {async_task.exception()}" + ) + self._mark_task_as_failed(async_task.task) continue downloaded_inputs: DownloadedInputs = await async_task task: Task = async_task.task @@ -209,7 +217,7 @@ async def task_launcher(self): case TaskEnum.RUN_FUNCTION_TASK: async_task: RunFunctionTask if async_task.exception(): - self.mark_task_as_failed( + self._mark_task_as_failed( async_task.task, str(async_task.exception()) ) continue @@ -234,35 +242,26 @@ async def task_launcher(self): self._task_store.retriable_failure(async_task.task.id) continue except Exception as e: - console.print( - Text( - f"Failed to execute task {async_task.task.id}\n", - style="red bold", - ) - + Text(f"Exception: {e}", style="red") + self._console_log_exception( + f"Failed to execute task {async_task.task.id}\n", + f"Exception: {e}" ) - completed_task = CompletedTask( - task=async_task.task, - task_outcome="failure", - outputs=[], + self._mark_task_as_failed( + async_task.task, str(e) ) - self._task_store.complete(outcome=completed_task) continue case _: raise ValueError( f"'{async_task.get_name()}' is not a valid task name." ) - def _console_log_exception(self, async_task: asyncio.Task): - console.print( - Text("Task Launcher Error: ", style="red bold") - + Text( - f"Failed to get runnable tasks: {async_task.exception()}", - style="red", - ) - ) + def _console_log_exception(self, *args: str): + errorMessage = None + for arg in args: + error_message = Text(arg) if errorMessage is None else error_message + arg + console.print(Text(error_message, style="red bold")) - def mark_task_as_failed(self, task: Task, stderr: str = None): + def _mark_task_as_failed(self, task: Task, stderr: str = None): completed_task = CompletedTask( task=task, outputs=[], @@ -288,7 +287,7 @@ def _try_bootstrap(self, task: Task) -> bool: ) self._executor_bootstrap_failed = True - self.mark_task_as_failed(task) + self._mark_task_as_failed(task) return False async def run(self): diff --git a/python-sdk/indexify/executor/downloader.py b/python-sdk/indexify/executor/downloader.py index 91a746e2f..b8b36763a 100644 --- a/python-sdk/indexify/executor/downloader.py +++ b/python-sdk/indexify/executor/downloader.py @@ -73,13 +73,6 @@ async def download_graph(self, namespace: str, name: str, version: int) -> str: os.makedirs(os.path.dirname(path), exist_ok=True) with open(path, "wb") as f: f.write(response.content) - console.print( - Panel( - f"Graph Downloaded", - title="downloader", - border_style="cyan", - ) - ) return path async def download_input(self, task: Task) -> DownloadedInputs: diff --git a/python-sdk/indexify/executor/function_worker/function_worker.py b/python-sdk/indexify/executor/function_worker/function_worker.py index b5df9d277..e37ef3bd1 100644 --- a/python-sdk/indexify/executor/function_worker/function_worker.py +++ b/python-sdk/indexify/executor/function_worker/function_worker.py @@ -42,18 +42,6 @@ class FunctionOutput(BaseModel): stdout: str = "" stderr: str = "" - -class Job(BaseModel): - namespace: str - graph_name: str - fn_name: str - input: IndexifyData - code_path: str - version: int - init_value: Optional[IndexifyData] = None - invocation_id: Optional[str] = None - - class FunctionWorker: def __init__( self, From 55154e832e74fd8a15c823102b1b1dc26d9a3b96 Mon Sep 17 00:00:00 2001 From: Default28 Date: Wed, 27 Nov 2024 17:02:24 +0530 Subject: [PATCH 25/25] Added IndexifyClient in task_reporter.py --- python-sdk/indexify/executor/agent.py | 6 ++++-- .../indexify/executor/task_reporter/task_reporter.py | 8 +++----- 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/python-sdk/indexify/executor/agent.py b/python-sdk/indexify/executor/agent.py index 66635de8a..26006c963 100644 --- a/python-sdk/indexify/executor/agent.py +++ b/python-sdk/indexify/executor/agent.py @@ -113,8 +113,10 @@ def __init__( self._task_reporter = TaskReporter( base_url=self._base_url, executor_id=self._executor_id, - config_path=self._config_path, - task_store=self._task_store, + indexify_client=IndexifyClient( + service_url=self._base_url, + config_path=self._config_path, + ), ) async def task_launcher(self): diff --git a/python-sdk/indexify/executor/task_reporter/task_reporter.py b/python-sdk/indexify/executor/task_reporter/task_reporter.py index 1b786272a..cd6e9c4cc 100644 --- a/python-sdk/indexify/executor/task_reporter/task_reporter.py +++ b/python-sdk/indexify/executor/task_reporter/task_reporter.py @@ -7,7 +7,7 @@ from rich import print from rich.text import Text -from indexify.common_util import get_httpx_client +from indexify import IndexifyClient from indexify.executor.api_objects import RouterOutput as ApiRouterOutput from indexify.executor.api_objects import TaskResult from indexify.executor.task_reporter.task_reporter_utils import ( @@ -43,13 +43,11 @@ def __init__( self, base_url: str, executor_id: str, - task_store: TaskStore, - config_path: Optional[str] = None, + indexify_client: IndexifyClient, ): self._base_url = base_url self._executor_id = executor_id - self._client = get_httpx_client(config_path) - self._task_store = task_store + self._client = indexify_client async def run(self): console.print(Text("Starting TaskReporter", style="bold cyan"))