Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

python-sdk(feat): Support async function #1017 #1046

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
fecd1bb
[WIP] refactored function worker into a separate folder
Default2882 Nov 20, 2024
a524073
Removed unused imports from cli.py
Default2882 Nov 20, 2024
00833ad
[WIP] using multiprocessing module to create processes for function w…
Default2882 Nov 20, 2024
57a3bec
Added server/indexify_server_state in .gitignore
Default2882 Nov 20, 2024
8b54182
Removing unused imports
Default2882 Nov 20, 2024
c66c59c
Ran make fmt
Default2882 Nov 20, 2024
fcd1156
bad commit
Default2882 Nov 20, 2024
85be72c
[WIP] Moved console from cli.py
Default2882 Nov 21, 2024
3e2a913
[WIP] Added ThreadPool to event loop
Default2882 Nov 21, 2024
297c85b
[WIP] Added models for run and download task
Default2882 Nov 21, 2024
e93d5a2
[WIP] Using event loop in downloader.py
Default2882 Nov 21, 2024
bcd0e5b
[WIP] added event loop in function worker
Default2882 Nov 21, 2024
45e5789
[WIP] Refactored task_reporter.py
Default2882 Nov 21, 2024
64b3c86
Added async function support in indexify_function decorator
Default2882 Nov 21, 2024
a3c2dfb
Working async functions
Default2882 Nov 21, 2024
4e5bfbf
linting
Default2882 Nov 21, 2024
59c1767
Remoiving unused objects
Default2882 Nov 21, 2024
82b139a
Added Indexify client in downloader.py
Default2882 Nov 21, 2024
2e63d3d
Deleting unused objects
Default2882 Nov 21, 2024
ed2a71b
Remove console.py
Default2882 Nov 21, 2024
18fc645
Refactored task reported
Default2882 Nov 21, 2024
130a9c2
linting
Default2882 Nov 21, 2024
a87bd52
Refactoring agent.py
Default2882 Nov 21, 2024
6243033
Removing unused objects and fixing log_exception method
Default2882 Nov 27, 2024
55154e8
Added IndexifyClient in task_reporter.py
Default2882 Nov 27, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ charts/*.tgz
indexify_storage
indexify_local_runner_cache
server/indexify_storage
server/indexify_server_state
local_cache
.dev-tls
src/state/store/snapshots/*
Expand Down
3 changes: 0 additions & 3 deletions python-sdk/indexify/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
from rich.theme import Theme

from indexify.executor.agent import ExtractorAgent
from indexify.executor.function_worker import FunctionWorker
from indexify.functions_sdk.image import (
DEFAULT_IMAGE_3_10,
DEFAULT_IMAGE_3_11,
Expand All @@ -31,7 +30,6 @@
"highlight": "magenta",
}
)

console = Console(theme=custom_theme)

app = typer.Typer(pretty_exceptions_enable=False, no_args_is_help=True)
Expand Down Expand Up @@ -250,7 +248,6 @@ def _build_image(image: Image, python_sdk_path: Optional[str] = None):

docker_file += "\n".join(run_strs)
print(os.getcwd())
import docker
import docker.api.build

docker.api.build.process_dockerfile = lambda dockerfile, path: (
Expand Down
363 changes: 154 additions & 209 deletions python-sdk/indexify/executor/agent.py

Large diffs are not rendered by default.

4 changes: 0 additions & 4 deletions python-sdk/indexify/executor/api_objects.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,6 @@ class RouterOutput(BaseModel):
edges: List[str]


class FnOutput(BaseModel):
payload: Json


class TaskResult(BaseModel):
router_output: Optional[RouterOutput] = None
outcome: str
Expand Down
87 changes: 42 additions & 45 deletions python-sdk/indexify/executor/downloader.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,18 @@
import asyncio
import os
from typing import Optional

import httpx
from pydantic import BaseModel
from rich.console import Console
from rich.panel import Panel
from rich.theme import Theme

from indexify.functions_sdk.data_objects import IndexifyData

from ..common_util import get_httpx_client
from .. import IndexifyClient
from ..functions_sdk.object_serializer import JsonSerializer, get_serializer
from .api_objects import Task
from .executor_tasks import DownloadTask, TaskEnum

custom_theme = Theme(
{
Expand All @@ -31,11 +32,29 @@ class DownloadedInputs(BaseModel):

class Downloader:
def __init__(
self, code_path: str, base_url: str, config_path: Optional[str] = None
self,
code_path: str,
base_url: str,
indexify_client: IndexifyClient,
):
self.code_path = code_path
self.base_url = base_url
self._client = get_httpx_client(config_path)
self._indexify_client = indexify_client
self._event_loop = asyncio.get_event_loop()

def download(self, task, name):
if name == TaskEnum.DOWNLOAD_GRAPH_TASK:
coroutine = self.download_graph(
task.namespace, task.compute_graph, task.graph_version
)
elif name == TaskEnum.DOWNLOAD_INPUT_TASK:
coroutine = self.download_input(task)
else:
raise ValueError(f"Unsupported task name: {name}")

return DownloadTask(
task=task, coroutine=coroutine, name=name, loop=self._event_loop
)

async def download_graph(self, namespace: str, name: str, version: int) -> str:
path = os.path.join(self.code_path, namespace, f"{name}.{version}")
Expand All @@ -50,21 +69,7 @@ async def download_graph(self, namespace: str, name: str, version: int) -> str:
)
)

response = self._client.get(
f"{self.base_url}/internal/namespaces/{namespace}/compute_graphs/{name}/code"
)
try:
response.raise_for_status()
except httpx.HTTPStatusError as e:
console.print(
Panel(
f"Failed to download graph: {name}\nError: {response.text}",
title="downloader error",
border_style="error",
)
)
raise

response = self._indexify_client.download_graph(namespace, name)
os.makedirs(os.path.dirname(path), exist_ok=True)
with open(path, "wb") as f:
f.write(response.content)
Expand All @@ -83,25 +88,29 @@ async def download_input(self, task: Task) -> DownloadedInputs:

console.print(
Panel(
f"downloading input\nURL: {url} \n reducer input URL: {reducer_url}",
f"downloading input\nFunction: {task.compute_fn} \n reducer id: {task.reducer_output_id}",
title="downloader",
border_style="cyan",
)
)

response = self._client.get(url)

try:
response.raise_for_status()
except httpx.HTTPStatusError as e:
console.print(
Panel(
f"failed to download input: {task.input_key}\nError: {response.text}",
title="downloader error",
border_style="error",
)
input_id = task.input_key.split("|")[-1]
if task.invocation_id == input_id:
response = self._indexify_client.download_fn_input(
task.namespace, task.compute_graph, task.invocation_id
)
else:
response = self._indexify_client.download_fn_output(task.input_key)

init_value = None
if task.reducer_output_id:
init_value = self._indexify_client.download_reducer_input(
task.namespace,
task.compute_graph,
task.invocation_id,
task.compute_fn,
task.reducer_output_id,
)
raise

encoder = (
"json"
Expand All @@ -119,19 +128,7 @@ async def download_input(self, task: Task) -> DownloadedInputs:

deserialized_content = serializer.deserialize(response.content)

if reducer_url:
init_value = self._client.get(reducer_url)
try:
init_value.raise_for_status()
except httpx.HTTPStatusError as e:
console.print(
Panel(
f"failed to download reducer output: {task.reducer_output_id}\nError: {init_value.text}",
title="downloader error",
border_style="error",
)
)
raise
if init_value:
init_value = serializer.deserialize(init_value.content)
return DownloadedInputs(
input=IndexifyData(
Expand Down
74 changes: 30 additions & 44 deletions python-sdk/indexify/executor/executor_tasks.py
Original file line number Diff line number Diff line change
@@ -1,73 +1,59 @@
import asyncio
from typing import Optional

from indexify.functions_sdk.data_objects import IndexifyData
from enum import Enum, unique
from typing import Coroutine

from .api_objects import Task
from .downloader import Downloader
from .function_worker import FunctionWorker


class DownloadGraphTask(asyncio.Task):
class RunFunctionTask(asyncio.Task):
def __init__(
self,
*,
task: Task,
downloader: Downloader,
coroutine: Coroutine,
loop: asyncio.AbstractEventLoop,
**kwargs,
):
kwargs["name"] = "download_graph"
kwargs["loop"] = asyncio.get_event_loop()
kwargs["name"] = TaskEnum.RUN_FUNCTION_TASK.value
kwargs["loop"] = loop
super().__init__(
downloader.download_graph(
task.namespace, task.compute_graph, task.graph_version
),
coroutine,
**kwargs,
)
self.task = task


class DownloadInputTask(asyncio.Task):
class DownloadTask(asyncio.Task):
def __init__(
self,
*,
task: Task,
downloader: Downloader,
coroutine: Coroutine,
name: str,
loop: asyncio.AbstractEventLoop,
**kwargs,
):
kwargs["name"] = "download_input"
kwargs["loop"] = asyncio.get_event_loop()
if not isinstance(name, TaskEnum):
raise ValueError(f"name '{name}' must be TaskEnum")
kwargs["name"] = name.value
kwargs["loop"] = loop
super().__init__(
downloader.download_input(task),
coroutine,
**kwargs,
)
self.task = task


class ExtractTask(asyncio.Task):
def __init__(
self,
*,
function_worker: FunctionWorker,
task: Task,
input: IndexifyData,
init_value: Optional[IndexifyData] = None,
code_path: str,
**kwargs,
):
kwargs["name"] = "run_function"
kwargs["loop"] = asyncio.get_event_loop()
super().__init__(
function_worker.async_submit(
namespace=task.namespace,
graph_name=task.compute_graph,
fn_name=task.compute_fn,
input=input,
init_value=init_value,
code_path=code_path,
version=task.graph_version,
invocation_id=task.invocation_id,
),
**kwargs,
)
self.task = task
@unique
class TaskEnum(Enum):
GET_RUNNABLE_TASK = "get_runnable_tasks"
RUN_FUNCTION_TASK = "run_function"
DOWNLOAD_GRAPH_TASK = "download_graph"
DOWNLOAD_INPUT_TASK = "download_input"

@classmethod
def from_value(cls, value):
for task_name in cls:
if task_name.value == value:
return task_name
raise ValueError(f"No task found with value {value}")
3 changes: 3 additions & 0 deletions python-sdk/indexify/executor/function_worker/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
from .function_worker import FunctionWorker

__all__ = ["FunctionWorker"]
Loading
Loading