Skip to content

Commit

Permalink
Ran make fmt
Browse files Browse the repository at this point in the history
  • Loading branch information
Default2882 committed Nov 21, 2024
1 parent e7a9932 commit 4b40a3f
Show file tree
Hide file tree
Showing 6 changed files with 49 additions and 39 deletions.
2 changes: 1 addition & 1 deletion python-sdk/indexify/executor/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
from rich.theme import Theme

from indexify.common_util import get_httpx_client
from indexify.executor.function_worker import FunctionWorker
from indexify.functions_sdk.data_objects import (
FunctionWorkerOutput,
IndexifyData,
Expand All @@ -26,7 +27,6 @@
from .api_objects import ExecutorMetadata, Task
from .downloader import DownloadedInputs, Downloader
from .executor_tasks import DownloadGraphTask, DownloadInputTask, ExtractTask
from indexify.executor.function_worker import FunctionWorker
from .runtime_probes import ProbeInfo, RuntimeProbes
from .task_reporter import TaskReporter
from .task_store import CompletedTask, TaskStore
Expand Down
2 changes: 1 addition & 1 deletion python-sdk/indexify/executor/executor_tasks.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
import asyncio
from typing import Optional

from indexify.executor.function_worker import FunctionWorker
from indexify.functions_sdk.data_objects import IndexifyData

from .api_objects import Task
from .downloader import Downloader
from indexify.executor.function_worker import FunctionWorker


class DownloadGraphTask(asyncio.Task):
Expand Down
4 changes: 1 addition & 3 deletions python-sdk/indexify/executor/function_worker/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
from .function_worker import FunctionWorker

__all__ = [
"FunctionWorker"
]
__all__ = ["FunctionWorker"]
76 changes: 43 additions & 33 deletions python-sdk/indexify/executor/function_worker/function_worker.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,17 @@
import multiprocessing as mp
import sys
import traceback
from asyncio import Future
from typing import Dict, List, Optional
import multiprocessing as mp

import cloudpickle
from pydantic import BaseModel
from rich import print

from indexify import IndexifyClient
from indexify.executor.function_worker.function_worker_utils import \
get_optimal_process_count
from indexify.executor.function_worker.function_worker_utils import (
get_optimal_process_count,
)
from indexify.functions_sdk.data_objects import (
FunctionWorkerOutput,
IndexifyData,
Expand All @@ -25,6 +26,7 @@

function_wrapper_map: Dict[str, IndexifyFunctionWrapper] = {}


class FunctionRunException(Exception):
def __init__(
self, exception: Exception, stdout: str, stderr: str, is_reducer: bool
Expand Down Expand Up @@ -74,6 +76,7 @@ def _load_function(
)
function_wrapper_map[key] = function_wrapper


class Job(BaseModel):
namespace: str
graph_name: str
Expand All @@ -84,12 +87,13 @@ class Job(BaseModel):
init_value: Optional[IndexifyData] = None
invocation_id: Optional[str] = None


class FunctionWorker:
def __init__(
self,
workers: int = get_optimal_process_count(),
pool_size: int = 1000,
indexify_client: IndexifyClient = None
indexify_client: IndexifyClient = None,
) -> None:
self._workers: int = workers
self._indexify_client: IndexifyClient = indexify_client
Expand All @@ -104,10 +108,7 @@ def _run(self):
if not self.job_queue.empty():
if len(self.running_processes) < self._workers:
future, job = self.job_queue.get()
process = mp.Process(
target=self._run_process,
args=(future, job)
)
process = mp.Process(target=self._run_process, args=(future, job))
process.start()
self.running_processes.append(process)
else:
Expand All @@ -127,21 +128,25 @@ def _run_process(self, future: Future, job: Job):
job.invocation_id,
self._indexify_client,
)
future.set_result(FunctionWorkerOutput(
fn_outputs=result.fn_outputs,
router_output=result.router_output,
stdout=result.stdout,
stderr=result.stderr,
reducer=result.reducer,
success=result.success,
))
future.set_result(
FunctionWorkerOutput(
fn_outputs=result.fn_outputs,
router_output=result.router_output,
stdout=result.stdout,
stderr=result.stderr,
reducer=result.reducer,
success=result.success,
)
)
except Exception as e:
future.set_result(FunctionWorkerOutput(
stdout=e.stdout,
stderr=e.stderr,
reducer=e.is_reducer,
success=False,
))
future.set_result(
FunctionWorkerOutput(
stdout=e.stdout,
stderr=e.stderr,
reducer=e.is_reducer,
success=False,
)
)

async def async_submit(
self,
Expand All @@ -155,17 +160,22 @@ async def async_submit(
invocation_id: Optional[str] = None,
) -> Future:
completion_future = Future()
self.job_queue.put((completion_future, Job(
namespace=namespace,
graph_name=graph_name,
fn_name=fn_name,
input=input,
code_path=code_path,
version=version,
init_value=init_value,
invocation_id=invocation_id,
indexify_client=self._indexify_client,
)))
self.job_queue.put(
(
completion_future,
Job(
namespace=namespace,
graph_name=graph_name,
fn_name=fn_name,
input=input,
code_path=code_path,
version=version,
init_value=init_value,
invocation_id=invocation_id,
indexify_client=self._indexify_client,
),
)
)
return completion_future

def shutdown(self):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ def get_optimal_process_count():
"""
return max(os.cpu_count() - 1, 1)


def job_generator() -> str:
"""
Generates job ID
Expand Down
3 changes: 2 additions & 1 deletion python-sdk/indexify/executor/indexify_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,10 @@

import nanoid

from .agent import ExtractorAgent
from indexify.executor.function_worker.function_worker import FunctionWorker

from .agent import ExtractorAgent


def join(
workers: int,
Expand Down

0 comments on commit 4b40a3f

Please sign in to comment.