-
Notifications
You must be signed in to change notification settings - Fork 120
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
python-sdk(feat): Support async function #1017 #1046
Conversation
8bc3537
to
7534402
Compare
@@ -272,6 +281,30 @@ def run_fn( | |||
) | |||
return output, None | |||
|
|||
async def run_fn_async( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This and other functions can use some refactoring to avoid code duplication.
f"Failed to get runnable tasks: {async_task.exception()}", | ||
style="red", | ||
task_name = TaskEnum.from_value(async_task.get_name()) | ||
match task_name: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This switch case can possibly use a strategy pattern
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep, this is a large switch it's be better to just a call a method per each case here.
python-sdk/indexify/executor/function_worker/function_worker.py
Outdated
Show resolved
Hide resolved
base_url: str, | ||
executor_id: str, | ||
task_store: TaskStore, | ||
config_path: Optional[str] = None, | ||
): | ||
self._base_url = base_url | ||
self._executor_id = executor_id | ||
self._client = get_httpx_client(config_path) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Make task reported use IndexifyClient
because using the basic http client does not set the API key.
@@ -225,7 +225,23 @@ def remote_or_local_pipeline(pipeline, remote=True): | |||
return pipeline | |||
|
|||
|
|||
@indexify_function() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add more tests.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
wrt commit 11f50f5
There are a few comments to address but overall looks good. Thank you.
I'll check other commits.
f"Failed to get runnable tasks: {async_task.exception()}", | ||
style="red", | ||
task_name = TaskEnum.from_value(async_task.get_name()) | ||
match task_name: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep, this is a large switch it's be better to just a call a method per each case here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for looking into this.
I started to review per commit but I see that there's no clear separation of commits in the PR. I'll then review full changes soon and let you know the feedback.
I also think that all the commits in this PR need to be squashed into a single commit because the commit titles are currently misleading and they'll just make the commit log and history confusing.
) | ||
|
||
|
||
def _log(task_outcome): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit. Why do we have underscores here? These functions are supposed to be visible outside of task_reporter_utils.py file. This is why I'm asking.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These functions are supposed to be visible outside of task_reporter_utils.py file
No, they shouldn't be. The idea is to use a custom Console theme for different parts of the executor, although I can see that there are some misses related to this in other areas of the code.
|
||
class FunctionWorker: | ||
def __init__( | ||
self, workers: int = 1, indexify_client: IndexifyClient = None | ||
self, | ||
workers: int = get_optimal_process_count(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here we add extra responsibility on deciding how many worker processes to run. Do we have to add this responsibility here given that it's already configurable by the caller?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Overall question about this PR.
Do we need to refactor whole Executor code base to be async to support async functions?
As it's stated in the issue #1017:
The function worker needs to detect if the user function is multiprocess and create an event loop, and call the function with asyncio.run or something of that nature. This might require our sdk to have async interfaces as well. That needs to be explored.
How this PR helps us to get there?
fyi we plan to implement running function code in separate processes soon. In this case it's even less important if Executor runs fully in async event loop or not.
python-sdk/indexify/executor/function_worker/function_worker.py
Outdated
Show resolved
Hide resolved
@@ -63,6 +64,9 @@ def __init__( | |||
name_alias: Optional[str] = None, | |||
image_version: Optional[int] = None, | |||
): | |||
event_loop = asyncio.get_event_loop() | |||
self._thread_pool = ThreadPoolExecutor(max_workers=num_workers) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Currently ProcessPoolExecutor is used. Why do we switch to ThreadPoolExecutor here? What are the implications of this switch? E.g. how do we make sure that there are no deadlocks between the threads now and in the future?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Even though we were importing and initialising ProcessPoolExecutor
, it wasn't being used anywhere. As per the existing implementation of agent, downloader, worker_function, and task_reported, all of them were executing on a single event loop.
As per - https://docs.python.org/3/library/concurrent.futures.html
ProcessPoolExecutor and ThreadPoolExecutor, behave the same way except the fact that one uses sub processes and one uses threads.
E.g. how do we make sure that there are no deadlocks between the threads now and in the future?
Each thread will be assigned a single unit of work, I don't see how we can deadlock here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep, this is right. Indeed ProcessPoolExecutor is not used. It seems unnecessary to explicitly create a ThreadPoolExecutor. See https://stackoverflow.com/questions/60204054/default-executor-asyncio. A ThreadPool is required for Python event loop anyway cause some operations like file reads/writes are not supported in asyncio yet.
Oh yea, this was never supposed to be reviewed commit by commit.
Will probably do that and force push to have a better commit history. |
I am aware of this and was trying to re-use the event loop to make the executor completely async (also took the liberty of re-factoring the code a bit), and give the consumers of python-sdk the option to run async indexify functions by using ThreadPoolExecutor with an event loop. However if we want to completely move away from event loop and use processes/threads, then the executor needs to be completely re-written due to lack of proper abstractions. |
11f50f5
to
55154e8
Compare
Yep, we'll discuss this internally and I'll come back to you in a few days with an answer about the plan for the code base. |
We're going to implement function execution in separate processes in OSS version next week. It's going to be a sizable patch. This allows functions to free all GPU resources they consume once finished and provides some other benefits. |
Got it, should I cancel this PR? Because it won't be relevant anymore. |
@Default2882 Lets close this since Eugene is re-writing the executor, we can revisit this feature after the new executor has landed. |
Context
Currently indexify does not support async functions, this is important because users would like to run functions which are I/O bound. Addresses #1017.
What
This change refactors the executor agent so that the downloader, function_worker and the task_reporter run in a ThreadPoolExecutor.
Testing
Added a basic test, will be adding more tests.
Contribution Checklist
make fmt
inpython-sdk/
.make fmt
inserver/
.