Skip to content

Commit

Permalink
bad commit
Browse files Browse the repository at this point in the history
  • Loading branch information
Default2882 committed Nov 21, 2024
1 parent 4b40a3f commit 7534402
Show file tree
Hide file tree
Showing 3 changed files with 55 additions and 68 deletions.
3 changes: 3 additions & 0 deletions python-sdk/indexify/executor/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ def __init__(

self._task_store: TaskStore = TaskStore()
self._executor_id = executor_id
console.print("Starting FunctionWorker", style="cyan bold")
self._function_worker = FunctionWorker(
workers=num_workers,
indexify_client=IndexifyClient(
Expand Down Expand Up @@ -360,6 +361,7 @@ async def task_launcher(self):
continue

async def run(self):
console.print("Starting Extractor Agent...", style="green")
import signal

asyncio.get_event_loop().add_signal_handler(
Expand All @@ -376,6 +378,7 @@ def to_sentence_case(snake_str):
words = snake_str.split("_")
return words[0].capitalize() + "" + " ".join(words[1:])

console.print("Starting Probe....")
runtime_probe: ProbeInfo = self._probe.probe()

executor_version = version("indexify")
Expand Down
28 changes: 28 additions & 0 deletions python-sdk/indexify/executor/executor_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,3 +71,31 @@ def __init__(
**kwargs,
)
self.task = task

class ExtractTask(asyncio.Future):
def __init__(
self,
*,
function_worker: FunctionWorker,
task: Task,
input: IndexifyData,
init_value: Optional[IndexifyData] = None,
code_path: str,
**kwargs,
):
kwargs["name"] = "run_function"
kwargs["loop"] = asyncio.get_event_loop()
super().__init__(
function_worker.async_submit(
namespace=task.namespace,
graph_name=task.compute_graph,
fn_name=task.compute_fn,
input=input,
init_value=init_value,
code_path=code_path,
version=task.graph_version,
invocation_id=task.invocation_id,
),
**kwargs,
)
self.task = task
92 changes: 24 additions & 68 deletions python-sdk/indexify/executor/function_worker/function_worker.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
import multiprocessing as mp
import asyncio
import sys
import traceback
from asyncio import Future
from concurrent.futures import ThreadPoolExecutor
from keyword import kwlist
from typing import Dict, List, Optional
from queue import Queue

import cloudpickle
from pydantic import BaseModel
Expand Down Expand Up @@ -92,100 +95,53 @@ class FunctionWorker:
def __init__(
self,
workers: int = get_optimal_process_count(),
pool_size: int = 1000,
indexify_client: IndexifyClient = None,
) -> None:
self._workers: int = workers
self._indexify_client: IndexifyClient = indexify_client
self.job_queue: mp.Queue[tuple[Future, Job]] = mp.Queue(maxsize=pool_size)
self.shutdown_event = mp.Event()
self.running_processes: list[mp.Process] = []
self.finished_jobs = mp.Queue()
self._run()
self._loop = asyncio.get_event_loop()
self._executor = ThreadPoolExecutor(max_workers=self._workers)
self._loop.set_default_executor(self._executor)

def _run(self):
while not self.shutdown_event.is_set():
if not self.job_queue.empty():
if len(self.running_processes) < self._workers:
future, job = self.job_queue.get()
process = mp.Process(target=self._run_process, args=(future, job))
process.start()
self.running_processes.append(process)
else:
# pass and wait for some processes to finish.
continue

def _run_process(self, future: Future, job: Job):
def _run_process(self, **kwargs):
try:
result = _run_function(
job.namespace,
job.graph_name,
job.fn_name,
job.input,
job.code_path,
job.version,
job.init_value,
job.invocation_id,
kwargs["namespace"],
kwargs["graph_name"],
kwargs["fn_name"],
kwargs["input"],
kwargs["code_path"],
kwargs["version"],
kwargs["init_value"],
kwargs["invocation_id"],
self._indexify_client,
)
future.set_result(
FunctionWorkerOutput(
return FunctionWorkerOutput(
fn_outputs=result.fn_outputs,
router_output=result.router_output,
stdout=result.stdout,
stderr=result.stderr,
reducer=result.reducer,
success=result.success,
)
)
except Exception as e:
future.set_result(
FunctionWorkerOutput(
return FunctionWorkerOutput(
stdout=e.stdout,
stderr=e.stderr,
reducer=e.is_reducer,
success=False,
)
)

async def async_submit(
def async_submit(
self,
namespace: str,
graph_name: str,
fn_name: str,
input: IndexifyData,
code_path: str,
version: int,
init_value: Optional[IndexifyData] = None,
invocation_id: Optional[str] = None,
**kwargs
) -> Future:
completion_future = Future()
self.job_queue.put(
(
completion_future,
Job(
namespace=namespace,
graph_name=graph_name,
fn_name=fn_name,
input=input,
code_path=code_path,
version=version,
init_value=init_value,
invocation_id=invocation_id,
indexify_client=self._indexify_client,
),
)
)
return completion_future
return self._loop.run_in_executor(None, _run_function, kwargs)

def shutdown(self):
self.shutdown_event.set()
# kill the processes when we receive shutdown signal
print(
f"[bold] function_worker: [/bold] Shutdown signal received, killing {len(self.running_processes)} processes"
)
for process in self.running_processes:
process.kill()
# kill everything
self._loop.shutdown_default_executor()
self._loop.stop()


def _run_function(
Expand Down

0 comments on commit 7534402

Please sign in to comment.