From 321c91a4bc0916ec01181a3432743c02ef9b4062 Mon Sep 17 00:00:00 2001 From: Alon Maor <48641682+AlonMaor14@users.noreply.github.com> Date: Mon, 16 Jan 2023 14:16:49 +0200 Subject: [PATCH] [Dask] Deprecate `with_requests` (#2910) --- mlrun/api/utils/helpers.py | 1 + mlrun/runtimes/daskjob.py | 15 +++++---------- mlrun/runtimes/sparkjob/abstract.py | 11 +++++++++-- tests/api/runtimes/test_dask.py | 3 --- tests/system/feature_store/test_feature_store.py | 2 +- 5 files changed, 16 insertions(+), 16 deletions(-) diff --git a/mlrun/api/utils/helpers.py b/mlrun/api/utils/helpers.py index 2a1c278d627..43dcdc0c8ac 100644 --- a/mlrun/api/utils/helpers.py +++ b/mlrun/api/utils/helpers.py @@ -20,6 +20,7 @@ from mlrun.utils import logger +# TODO: From python 3.11 StrEnum is built-in and this will not be needed class StrEnum(str, enum.Enum): def __str__(self): return self.value diff --git a/mlrun/runtimes/daskjob.py b/mlrun/runtimes/daskjob.py index 5d4f9ca577c..19f847dfd25 100644 --- a/mlrun/runtimes/daskjob.py +++ b/mlrun/runtimes/daskjob.py @@ -464,18 +464,12 @@ def with_worker_limits( "worker_resources", mem, cpu, gpus, gpu_type, patch=patch ) - def with_requests(self, mem=None, cpu=None): - warnings.warn( - "Dask's with_requests will be deprecated in 0.8.0, and will be removed in 0.10.0, use " + def with_requests(self, mem=None, cpu=None, patch: bool = False): + # TODO: In 1.4.0 change to NotImplementedError + raise DeprecationWarning( + "Dask's with_requests is deprecated and will be removed in 1.4.0, use " "with_scheduler_requests/with_worker_requests instead", - # TODO: In 0.8.0 deprecate and replace with_requests to with_worker/scheduler_requests in examples & demos - # (or maybe just change behavior ?) - PendingDeprecationWarning, ) - # the scheduler/worker specific function was introduced after the general one, to keep backwards compatibility - # this function just sets the requests for both of them - self.with_scheduler_requests(mem, cpu) - self.with_worker_requests(mem, cpu) def with_scheduler_requests( self, mem: str = None, cpu: str = None, patch: bool = False @@ -602,6 +596,7 @@ def enrich_dask_cluster(function, secrets, client_version): env.append(spec.extra_pip) pod_labels = get_resource_labels(function, scrape_metrics=config.scrape_metrics) + # TODO: 'dask-worker' has deprecation notice, user 'dask worker' instead worker_args = ["dask-worker", "--nthreads", str(spec.nthreads)] memory_limit = spec.resources.get("limits", {}).get("memory") if memory_limit: diff --git a/mlrun/runtimes/sparkjob/abstract.py b/mlrun/runtimes/sparkjob/abstract.py index b3df0f8bf7b..8279cd8e9ba 100644 --- a/mlrun/runtimes/sparkjob/abstract.py +++ b/mlrun/runtimes/sparkjob/abstract.py @@ -672,12 +672,19 @@ def with_igz_spark(self, mount_v3io_to_executor=True): "file://" + config.spark_history_server_path ) - def with_limits(self, mem=None, cpu=None, gpus=None, gpu_type="nvidia.com/gpu"): + def with_limits( + self, + mem=None, + cpu=None, + gpus=None, + gpu_type="nvidia.com/gpu", + patch: bool = False, + ): raise NotImplementedError( "In spark runtimes, please use with_driver_limits & with_executor_limits" ) - def with_requests(self, mem=None, cpu=None): + def with_requests(self, mem=None, cpu=None, patch: bool = False): raise NotImplementedError( "In spark runtimes, please use with_driver_requests & with_executor_requests" ) diff --git a/tests/api/runtimes/test_dask.py b/tests/api/runtimes/test_dask.py index a5e6e65dca9..1f920a23d18 100644 --- a/tests/api/runtimes/test_dask.py +++ b/tests/api/runtimes/test_dask.py @@ -211,9 +211,6 @@ def test_dask_runtime_with_resources(self, db: Session, client: TestClient): runtime.with_worker_requests( mem=expected_requests["memory"], cpu=expected_requests["cpu"] ) - runtime.with_requests( - mem=expected_requests["memory"], cpu=expected_requests["cpu"] - ) gpu_type = "nvidia.com/gpu" expected_gpus = 2 expected_scheduler_limits = generate_resources( diff --git a/tests/system/feature_store/test_feature_store.py b/tests/system/feature_store/test_feature_store.py index 88b95cc10f9..453d4a669c8 100644 --- a/tests/system/feature_store/test_feature_store.py +++ b/tests/system/feature_store/test_feature_store.py @@ -2802,7 +2802,7 @@ def test_get_offline_features_with_filter(self, engine): ) dask_cluster.apply(mlrun.mount_v3io()) dask_cluster.spec.remote = True - dask_cluster.with_requests(mem="2G") + dask_cluster.with_worker_requests(mem="2G") dask_cluster.save() engine_args = { "dask_client": dask_cluster,