Skip to content

Commit

Permalink
[Dask] Deprecate with_requests (mlrun#2910)
Browse files Browse the repository at this point in the history
  • Loading branch information
alonmr authored Jan 16, 2023
1 parent 355c0b3 commit 321c91a
Show file tree
Hide file tree
Showing 5 changed files with 16 additions and 16 deletions.
1 change: 1 addition & 0 deletions mlrun/api/utils/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
from mlrun.utils import logger


# TODO: From python 3.11 StrEnum is built-in and this will not be needed
class StrEnum(str, enum.Enum):
def __str__(self):
return self.value
Expand Down
15 changes: 5 additions & 10 deletions mlrun/runtimes/daskjob.py
Original file line number Diff line number Diff line change
Expand Up @@ -464,18 +464,12 @@ def with_worker_limits(
"worker_resources", mem, cpu, gpus, gpu_type, patch=patch
)

def with_requests(self, mem=None, cpu=None):
warnings.warn(
"Dask's with_requests will be deprecated in 0.8.0, and will be removed in 0.10.0, use "
def with_requests(self, mem=None, cpu=None, patch: bool = False):
# TODO: In 1.4.0 change to NotImplementedError
raise DeprecationWarning(
"Dask's with_requests is deprecated and will be removed in 1.4.0, use "
"with_scheduler_requests/with_worker_requests instead",
# TODO: In 0.8.0 deprecate and replace with_requests to with_worker/scheduler_requests in examples & demos
# (or maybe just change behavior ?)
PendingDeprecationWarning,
)
# the scheduler/worker specific function was introduced after the general one, to keep backwards compatibility
# this function just sets the requests for both of them
self.with_scheduler_requests(mem, cpu)
self.with_worker_requests(mem, cpu)

def with_scheduler_requests(
self, mem: str = None, cpu: str = None, patch: bool = False
Expand Down Expand Up @@ -602,6 +596,7 @@ def enrich_dask_cluster(function, secrets, client_version):
env.append(spec.extra_pip)

pod_labels = get_resource_labels(function, scrape_metrics=config.scrape_metrics)
# TODO: 'dask-worker' has deprecation notice, user 'dask worker' instead
worker_args = ["dask-worker", "--nthreads", str(spec.nthreads)]
memory_limit = spec.resources.get("limits", {}).get("memory")
if memory_limit:
Expand Down
11 changes: 9 additions & 2 deletions mlrun/runtimes/sparkjob/abstract.py
Original file line number Diff line number Diff line change
Expand Up @@ -672,12 +672,19 @@ def with_igz_spark(self, mount_v3io_to_executor=True):
"file://" + config.spark_history_server_path
)

def with_limits(self, mem=None, cpu=None, gpus=None, gpu_type="nvidia.com/gpu"):
def with_limits(
self,
mem=None,
cpu=None,
gpus=None,
gpu_type="nvidia.com/gpu",
patch: bool = False,
):
raise NotImplementedError(
"In spark runtimes, please use with_driver_limits & with_executor_limits"
)

def with_requests(self, mem=None, cpu=None):
def with_requests(self, mem=None, cpu=None, patch: bool = False):
raise NotImplementedError(
"In spark runtimes, please use with_driver_requests & with_executor_requests"
)
Expand Down
3 changes: 0 additions & 3 deletions tests/api/runtimes/test_dask.py
Original file line number Diff line number Diff line change
Expand Up @@ -211,9 +211,6 @@ def test_dask_runtime_with_resources(self, db: Session, client: TestClient):
runtime.with_worker_requests(
mem=expected_requests["memory"], cpu=expected_requests["cpu"]
)
runtime.with_requests(
mem=expected_requests["memory"], cpu=expected_requests["cpu"]
)
gpu_type = "nvidia.com/gpu"
expected_gpus = 2
expected_scheduler_limits = generate_resources(
Expand Down
2 changes: 1 addition & 1 deletion tests/system/feature_store/test_feature_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -2802,7 +2802,7 @@ def test_get_offline_features_with_filter(self, engine):
)
dask_cluster.apply(mlrun.mount_v3io())
dask_cluster.spec.remote = True
dask_cluster.with_requests(mem="2G")
dask_cluster.with_worker_requests(mem="2G")
dask_cluster.save()
engine_args = {
"dask_client": dask_cluster,
Expand Down

0 comments on commit 321c91a

Please sign in to comment.