From a9431c7e4a2846a773cbd9a689bb23db0a0d7cc8 Mon Sep 17 00:00:00 2001 From: Sihan Wang Date: Tue, 11 Apr 2023 15:29:42 -0700 Subject: [PATCH 1/2] [Serve][Release][Part2] Add release tests to GCE (#34245) Makes some Serve release tests run on GCE. serve_serve_micro_benchmark succeed https://buildkite.com/ray-project/release-tests-pr/builds/34505#01876e9b-b99c-40de-af94-6c7044b401dc deployment_graph_long_chain succeed https://buildkite.com/ray-project/release-tests-pr/builds/34507#01876eb9-8727-4a59-b193-ce2ff5e9647e deployment_graph_wide_ensemble succeed https://buildkite.com/ray-project/release-tests-pr/builds/34510#01876eca-27ce-4234-b668-eea78767910d serve_handle_long_chain succeed https://buildkite.com/ray-project/release-tests-pr/builds/34553#01877135-9e44-4653-ae53-be785ca5574c serve_handle_wide_ensemble succeed https://buildkite.com/ray-project/release-tests-pr/builds/34566#0187714b-45ff-4fd9-8a22-8c8bb45b0748 serve_micro_protocol_grpc_benchmark succeed https://buildkite.com/ray-project/release-tests-pr/builds/34569#0187715b-d78b-4b3a-a0be-4cb556482729 serve_micro_protocol_http_benchmark succeed https://buildkite.com/ray-project/release-tests-pr/builds/34570#0187716b-e710-4688-b31d-c8cc3f67ab4f serve_resnet_benchmark succeed https://buildkite.com/ray-project/release-tests-pr/builds/34604#018771c7-9a7f-42f1-a6ce-766e32e48fae --- release/release_tests.yaml | 104 ++++++++++++++++++ .../serve_tests/compute_tpl_32_cpu_gce.yaml | 2 +- .../serve_tests/compute_tpl_gpu_node_gce.yaml | 17 +++ .../compute_tpl_single_node_32_cpu_gce.yaml | 17 +++ .../compute_tpl_single_node_gce.yaml | 17 +++ 5 files changed, 156 insertions(+), 1 deletion(-) create mode 100644 release/serve_tests/compute_tpl_gpu_node_gce.yaml create mode 100644 release/serve_tests/compute_tpl_single_node_32_cpu_gce.yaml create mode 100644 release/serve_tests/compute_tpl_single_node_gce.yaml diff --git a/release/release_tests.yaml b/release/release_tests.yaml index 5caf5597dcf9..8c4efd187cbd 100644 --- a/release/release_tests.yaml +++ b/release/release_tests.yaml @@ -2713,6 +2713,19 @@ alert: default + variations: + - __suffix__: aws + - __suffix__: gce + env: gce + frequency: manual + smoke_test: + frequency: manual + run: + timeout: 7200 + cluster: + cluster_env: app_config.yaml + cluster_compute: compute_tpl_single_node_gce.yaml + # - name: serve_serve_micro_benchmark_k8s # group: Serve tests # working_dir: serve_tests @@ -2752,6 +2765,19 @@ alert: default stable: False + variations: + - __suffix__: aws + - __suffix__: gce + env: gce + frequency: manual + smoke_test: + frequency: manual + run: + timeout: 3600 + cluster: + cluster_env: app_config.yaml + cluster_compute: compute_tpl_single_node_32_cpu_gce.yaml + - name: deployment_graph_wide_ensemble group: Serve tests working_dir: serve_tests @@ -2771,6 +2797,19 @@ alert: default stable: False + variations: + - __suffix__: aws + - __suffix__: gce + env: gce + frequency: manual + smoke_test: + frequency: manual + run: + timeout: 3600 + cluster: + cluster_env: app_config.yaml + cluster_compute: compute_tpl_single_node_32_cpu_gce.yaml + - name: serve_handle_long_chain group: Serve tests working_dir: serve_tests @@ -2790,6 +2829,19 @@ alert: default stable: False + variations: + - __suffix__: aws + - __suffix__: gce + env: gce + frequency: manual + smoke_test: + frequency: manual + run: + timeout: 3600 + cluster: + cluster_env: app_config.yaml + cluster_compute: compute_tpl_single_node_32_cpu_gce.yaml + - name: serve_handle_wide_ensemble group: Serve tests working_dir: serve_tests @@ -2809,6 +2861,19 @@ alert: default stable: False + variations: + - __suffix__: aws + - __suffix__: gce + env: gce + frequency: manual + smoke_test: + frequency: manual + run: + timeout: 3600 + cluster: + cluster_env: app_config.yaml + cluster_compute: compute_tpl_single_node_32_cpu_gce.yaml + - name: serve_micro_protocol_grpc_benchmark group: Serve tests @@ -2828,6 +2893,19 @@ alert: default + variations: + - __suffix__: aws + - __suffix__: gce + env: gce + frequency: manual + smoke_test: + frequency: manual + run: + timeout: 7200 + cluster: + cluster_env: app_config.yaml + cluster_compute: compute_tpl_single_node_gce.yaml + - name: serve_micro_protocol_http_benchmark group: Serve tests working_dir: serve_tests @@ -2846,6 +2924,19 @@ alert: default + variations: + - __suffix__: aws + - __suffix__: gce + env: gce + frequency: manual + smoke_test: + frequency: manual + run: + timeout: 7200 + cluster: + cluster_env: app_config.yaml + cluster_compute: compute_tpl_single_node_gce.yaml + - name: serve_resnet_benchmark group: Serve tests @@ -2865,6 +2956,19 @@ alert: default + variations: + - __suffix__: aws + - __suffix__: gce + env: gce + frequency: manual + smoke_test: + frequency: manual + run: + timeout: 7200 + cluster: + cluster_env: gpu_app_config.yaml + cluster_compute: compute_tpl_gpu_node_gce.yaml + ######################## # Train tests ######################## diff --git a/release/serve_tests/compute_tpl_32_cpu_gce.yaml b/release/serve_tests/compute_tpl_32_cpu_gce.yaml index e4cc0e474f59..26a2474f3af3 100644 --- a/release/serve_tests/compute_tpl_32_cpu_gce.yaml +++ b/release/serve_tests/compute_tpl_32_cpu_gce.yaml @@ -11,7 +11,7 @@ head_node_type: worker_node_types: - name: worker_node - instance_type: n2-standard-32 # m5.4xlarge + instance_type: n2-standard-32 # m5.8xlarge min_workers: 32 max_workers: 32 use_spot: false \ No newline at end of file diff --git a/release/serve_tests/compute_tpl_gpu_node_gce.yaml b/release/serve_tests/compute_tpl_gpu_node_gce.yaml new file mode 100644 index 000000000000..9341c01b8899 --- /dev/null +++ b/release/serve_tests/compute_tpl_gpu_node_gce.yaml @@ -0,0 +1,17 @@ +cloud_id: {{env["ANYSCALE_CLOUD_ID"]}} +region: us-west1 +allowed_azs: + - us-west1-b + +max_workers: 32 + +head_node_type: + name: head_node + instance_type: n1-standard-16-nvidia-tesla-t4-1 # g4dn.4xlarge + +worker_node_types: + - name: worker_node + instance_type: n2-standard-16 # m5.4xlarge + min_workers: 0 + max_workers: 1 + use_spot: false \ No newline at end of file diff --git a/release/serve_tests/compute_tpl_single_node_32_cpu_gce.yaml b/release/serve_tests/compute_tpl_single_node_32_cpu_gce.yaml new file mode 100644 index 000000000000..c3171d81fac9 --- /dev/null +++ b/release/serve_tests/compute_tpl_single_node_32_cpu_gce.yaml @@ -0,0 +1,17 @@ +cloud_id: {{env["ANYSCALE_CLOUD_ID"]}} +region: us-west1 +allowed_azs: + - us-west1-c + +max_workers: 0 + +head_node_type: + name: head_node + instance_type: n2-standard-32 # m5.8xlarge + +worker_node_types: + - name: worker_node + instance_type: n2-standard-32 # m5.8xlarge + min_workers: 0 + max_workers: 0 + use_spot: false \ No newline at end of file diff --git a/release/serve_tests/compute_tpl_single_node_gce.yaml b/release/serve_tests/compute_tpl_single_node_gce.yaml new file mode 100644 index 000000000000..7729adaa517b --- /dev/null +++ b/release/serve_tests/compute_tpl_single_node_gce.yaml @@ -0,0 +1,17 @@ +cloud_id: {{env["ANYSCALE_CLOUD_ID"]}} +region: us-west1 +allowed_azs: + - us-west1-c + +max_workers: 0 + +head_node_type: + name: head_node + instance_type: n2-standard-16 # m5.4xlarge + +worker_node_types: + - name: worker_node + instance_type: n2-standard-16 # m5.4xlarge + min_workers: 0 + max_workers: 0 + use_spot: false \ No newline at end of file From 66d3aaf6783014a0d02c46e92ba2952ca8f2028c Mon Sep 17 00:00:00 2001 From: Jian Xiao <99709935+jianoaix@users.noreply.github.com> Date: Tue, 11 Apr 2023 17:58:09 -0700 Subject: [PATCH 2/2] [data] Make sure the tf and tensor iteration work in dataset pipeline (#34248) * Revert "[Datasets] Revert "Enable streaming executor by default (#32493)" (#33485)" This reverts commit 5c7995454c03ad9ac28d50e7d339f8e8ee84c236. * make sure tf and tensor iteration in datapipeline work * Fix * fix * fix * fix * feedback * feedback * fix --- python/ray/data/_internal/stats.py | 1 + python/ray/data/dataset_iterator.py | 19 +++++++------- python/ray/data/dataset_pipeline.py | 10 ++++--- .../ray/data/tests/test_dataset_pipeline.py | 26 +++++++++++++++++-- 4 files changed, 41 insertions(+), 15 deletions(-) diff --git a/python/ray/data/_internal/stats.py b/python/ray/data/_internal/stats.py index b903445fde37..c3627f089eda 100644 --- a/python/ray/data/_internal/stats.py +++ b/python/ray/data/_internal/stats.py @@ -874,6 +874,7 @@ def __init__(self, *, max_history: int = 3): "iter_get_s": Timer(), "iter_next_batch_s": Timer(), "iter_format_batch_s": Timer(), + "iter_collate_batch_s": Timer(), "iter_user_s": Timer(), "iter_total_s": Timer(), } diff --git a/python/ray/data/dataset_iterator.py b/python/ray/data/dataset_iterator.py index 2d54d5d1ed9a..f711d766ca6c 100644 --- a/python/ray/data/dataset_iterator.py +++ b/python/ray/data/dataset_iterator.py @@ -31,6 +31,13 @@ from ray.data.dataset import TensorFlowTensorBatchType +def _is_tensor_dataset(schema) -> bool: + """Return ``True`` if this is an iterator over a tensor dataset.""" + if schema is None or isinstance(schema, type): + return False + return _is_tensor_schema(schema.names) + + @PublicAPI(stability="beta") class DataIterator(abc.ABC): """An iterator for reading items from a :class:`~Dataset` or @@ -728,13 +735,14 @@ def to_tf( except ImportError: raise ValueError("tensorflow must be installed!") - if self._is_tensor_dataset(): + schema = self.schema() + + if _is_tensor_dataset(schema): raise NotImplementedError( "`to_tf` doesn't support single-column tensor datasets. Call the " "more-flexible `iter_batches` instead." ) - schema = self.schema() if isinstance(schema, type): raise NotImplementedError( "`to_tf` doesn't support simple datasets. Call `map_batches` and " @@ -819,13 +827,6 @@ def iter_epochs(self, max_epoch: int = -1) -> None: "iter_torch_batches(), or to_tf()." ) - def _is_tensor_dataset(self) -> bool: - """Return ``True`` if this is an iterator over a tensor dataset.""" - schema = self.schema() - if schema is None or isinstance(schema, type): - return False - return _is_tensor_schema(schema.names) - # Backwards compatibility alias. DatasetIterator = DataIterator diff --git a/python/ray/data/dataset_pipeline.py b/python/ray/data/dataset_pipeline.py index adb6293d49d0..e1bb70059661 100644 --- a/python/ray/data/dataset_pipeline.py +++ b/python/ray/data/dataset_pipeline.py @@ -169,6 +169,8 @@ def gen_rows() -> Iterator[Union[T, TableRow]]: def iter_batches( self, *, + prefetch_batches: int = 1, + # Deprecated. prefetch_blocks: int = 0, batch_size: Optional[int] = 256, batch_format: Optional[str] = "default", @@ -1071,7 +1073,7 @@ def iter_tf_batches( """Call :py:meth:`Dataset.iter_tf_batches ` over the stream of output batches from the pipeline.""" - return Dataset.iter_tf_batches( + return DataIterator.iter_tf_batches( self, prefetch_blocks=prefetch_blocks, batch_size=batch_size, @@ -1097,7 +1099,7 @@ def iter_torch_batches( """Call :py:meth:`Dataset.iter_torch_batches ` over the stream of output batches from the pipeline.""" - return Dataset.iter_torch_batches( + return DataIterator.iter_torch_batches( self, prefetch_blocks=prefetch_blocks, batch_size=batch_size, @@ -1122,7 +1124,7 @@ def to_tf( ) -> "tf.data.Dataset": """Call :py:meth:`Dataset.to_tf ` over the stream of output batches from the pipeline""" - return Dataset.to_tf( + return DataIterator.to_tf( self, feature_columns=feature_columns, label_columns=label_columns, @@ -1152,7 +1154,7 @@ def to_torch( ) -> "torch.utils.data.IterableDataset": """Call :py:meth:`Dataset.to_torch ` over the stream of output batches from the pipeline""" - return Dataset.to_torch( + return DataIterator.to_torch( self, label_column=label_column, feature_columns=feature_columns, diff --git a/python/ray/data/tests/test_dataset_pipeline.py b/python/ray/data/tests/test_dataset_pipeline.py index 393aa47b3bee..644512dcd962 100644 --- a/python/ray/data/tests/test_dataset_pipeline.py +++ b/python/ray/data/tests/test_dataset_pipeline.py @@ -394,9 +394,31 @@ def test_iter_batches_basic(ray_start_regular_shared): def test_to_torch(ray_start_regular_shared): - pipe = ray.data.range(10, parallelism=10).window(blocks_per_window=2) + pipe = ray.data.range(10, parallelism=10).window(blocks_per_window=2).repeat(2) batches = list(pipe.to_torch(batch_size=None)) - assert len(batches) == 10 + assert len(batches) == 20 + + +def test_to_tf(ray_start_regular_shared): + ds = ray.data.range_tensor(10, shape=(1, 1, 1), parallelism=10) + ds = ds.add_column("label", lambda x: 1) + pipe = ds.window(blocks_per_window=2).repeat(2) + batches = list( + pipe.to_tf(feature_columns="__value__", label_columns="label", batch_size=None) + ) + assert len(batches) == 20 + + +def test_iter_torch_batches(ray_start_regular_shared): + pipe = ray.data.range(10).repeat(2) + batches = list(pipe.iter_torch_batches(batch_size=1)) + assert len(batches) == 20 + + +def test_iter_tf_batches(ray_start_regular_shared): + pipe = ray.data.range(10).repeat(2) + batches = list(pipe.iter_tf_batches(batch_size=1)) + assert len(batches) == 20 def test_iter_batches_batch_across_windows(ray_start_regular_shared):