diff --git a/python-sdk/indexify/executor/agent.py b/python-sdk/indexify/executor/agent.py index e19051126..6a53d7bf6 100644 --- a/python-sdk/indexify/executor/agent.py +++ b/python-sdk/indexify/executor/agent.py @@ -89,6 +89,7 @@ def __init__( self._task_store: TaskStore = TaskStore() self._executor_id = executor_id + console.print("Starting FunctionWorker", style="cyan bold") self._function_worker = FunctionWorker( workers=num_workers, indexify_client=IndexifyClient( @@ -360,6 +361,7 @@ async def task_launcher(self): continue async def run(self): + console.print("Starting Extractor Agent...", style="green") import signal asyncio.get_event_loop().add_signal_handler( @@ -376,6 +378,7 @@ def to_sentence_case(snake_str): words = snake_str.split("_") return words[0].capitalize() + "" + " ".join(words[1:]) + console.print("Starting Probe....") runtime_probe: ProbeInfo = self._probe.probe() executor_version = version("indexify") diff --git a/python-sdk/indexify/executor/executor_tasks.py b/python-sdk/indexify/executor/executor_tasks.py index 3d59cf0b7..372c196bc 100644 --- a/python-sdk/indexify/executor/executor_tasks.py +++ b/python-sdk/indexify/executor/executor_tasks.py @@ -71,3 +71,31 @@ def __init__( **kwargs, ) self.task = task + +class ExtractTask(asyncio.Future): + def __init__( + self, + *, + function_worker: FunctionWorker, + task: Task, + input: IndexifyData, + init_value: Optional[IndexifyData] = None, + code_path: str, + **kwargs, + ): + kwargs["name"] = "run_function" + kwargs["loop"] = asyncio.get_event_loop() + super().__init__( + function_worker.async_submit( + namespace=task.namespace, + graph_name=task.compute_graph, + fn_name=task.compute_fn, + input=input, + init_value=init_value, + code_path=code_path, + version=task.graph_version, + invocation_id=task.invocation_id, + ), + **kwargs, + ) + self.task = task diff --git a/python-sdk/indexify/executor/function_worker/function_worker.py b/python-sdk/indexify/executor/function_worker/function_worker.py index 25b5bbd3b..2e7d675f9 100644 --- a/python-sdk/indexify/executor/function_worker/function_worker.py +++ b/python-sdk/indexify/executor/function_worker/function_worker.py @@ -1,8 +1,11 @@ -import multiprocessing as mp +import asyncio import sys import traceback from asyncio import Future +from concurrent.futures import ThreadPoolExecutor +from keyword import kwlist from typing import Dict, List, Optional +from queue import Queue import cloudpickle from pydantic import BaseModel @@ -92,44 +95,28 @@ class FunctionWorker: def __init__( self, workers: int = get_optimal_process_count(), - pool_size: int = 1000, indexify_client: IndexifyClient = None, ) -> None: self._workers: int = workers self._indexify_client: IndexifyClient = indexify_client - self.job_queue: mp.Queue[tuple[Future, Job]] = mp.Queue(maxsize=pool_size) - self.shutdown_event = mp.Event() - self.running_processes: list[mp.Process] = [] - self.finished_jobs = mp.Queue() - self._run() + self._loop = asyncio.get_event_loop() + self._executor = ThreadPoolExecutor(max_workers=self._workers) + self._loop.set_default_executor(self._executor) - def _run(self): - while not self.shutdown_event.is_set(): - if not self.job_queue.empty(): - if len(self.running_processes) < self._workers: - future, job = self.job_queue.get() - process = mp.Process(target=self._run_process, args=(future, job)) - process.start() - self.running_processes.append(process) - else: - # pass and wait for some processes to finish. - continue - - def _run_process(self, future: Future, job: Job): + def _run_process(self, **kwargs): try: result = _run_function( - job.namespace, - job.graph_name, - job.fn_name, - job.input, - job.code_path, - job.version, - job.init_value, - job.invocation_id, + kwargs["namespace"], + kwargs["graph_name"], + kwargs["fn_name"], + kwargs["input"], + kwargs["code_path"], + kwargs["version"], + kwargs["init_value"], + kwargs["invocation_id"], self._indexify_client, ) - future.set_result( - FunctionWorkerOutput( + return FunctionWorkerOutput( fn_outputs=result.fn_outputs, router_output=result.router_output, stdout=result.stdout, @@ -137,55 +124,24 @@ def _run_process(self, future: Future, job: Job): reducer=result.reducer, success=result.success, ) - ) except Exception as e: - future.set_result( - FunctionWorkerOutput( + return FunctionWorkerOutput( stdout=e.stdout, stderr=e.stderr, reducer=e.is_reducer, success=False, ) - ) - async def async_submit( + def async_submit( self, - namespace: str, - graph_name: str, - fn_name: str, - input: IndexifyData, - code_path: str, - version: int, - init_value: Optional[IndexifyData] = None, - invocation_id: Optional[str] = None, + **kwargs ) -> Future: - completion_future = Future() - self.job_queue.put( - ( - completion_future, - Job( - namespace=namespace, - graph_name=graph_name, - fn_name=fn_name, - input=input, - code_path=code_path, - version=version, - init_value=init_value, - invocation_id=invocation_id, - indexify_client=self._indexify_client, - ), - ) - ) - return completion_future + return self._loop.run_in_executor(None, _run_function, kwargs) def shutdown(self): - self.shutdown_event.set() - # kill the processes when we receive shutdown signal - print( - f"[bold] function_worker: [/bold] Shutdown signal received, killing {len(self.running_processes)} processes" - ) - for process in self.running_processes: - process.kill() + # kill everything + self._loop.shutdown_default_executor() + self._loop.stop() def _run_function(