diff --git a/python/ray/data/_internal/stats.py b/python/ray/data/_internal/stats.py index b903445fde377..c3627f089edaa 100644 --- a/python/ray/data/_internal/stats.py +++ b/python/ray/data/_internal/stats.py @@ -874,6 +874,7 @@ def __init__(self, *, max_history: int = 3): "iter_get_s": Timer(), "iter_next_batch_s": Timer(), "iter_format_batch_s": Timer(), + "iter_collate_batch_s": Timer(), "iter_user_s": Timer(), "iter_total_s": Timer(), } diff --git a/python/ray/data/dataset_iterator.py b/python/ray/data/dataset_iterator.py index 2d54d5d1ed9a9..f711d766ca6cd 100644 --- a/python/ray/data/dataset_iterator.py +++ b/python/ray/data/dataset_iterator.py @@ -31,6 +31,13 @@ from ray.data.dataset import TensorFlowTensorBatchType +def _is_tensor_dataset(schema) -> bool: + """Return ``True`` if this is an iterator over a tensor dataset.""" + if schema is None or isinstance(schema, type): + return False + return _is_tensor_schema(schema.names) + + @PublicAPI(stability="beta") class DataIterator(abc.ABC): """An iterator for reading items from a :class:`~Dataset` or @@ -728,13 +735,14 @@ def to_tf( except ImportError: raise ValueError("tensorflow must be installed!") - if self._is_tensor_dataset(): + schema = self.schema() + + if _is_tensor_dataset(schema): raise NotImplementedError( "`to_tf` doesn't support single-column tensor datasets. Call the " "more-flexible `iter_batches` instead." ) - schema = self.schema() if isinstance(schema, type): raise NotImplementedError( "`to_tf` doesn't support simple datasets. Call `map_batches` and " @@ -819,13 +827,6 @@ def iter_epochs(self, max_epoch: int = -1) -> None: "iter_torch_batches(), or to_tf()." ) - def _is_tensor_dataset(self) -> bool: - """Return ``True`` if this is an iterator over a tensor dataset.""" - schema = self.schema() - if schema is None or isinstance(schema, type): - return False - return _is_tensor_schema(schema.names) - # Backwards compatibility alias. DatasetIterator = DataIterator diff --git a/python/ray/data/dataset_pipeline.py b/python/ray/data/dataset_pipeline.py index adb6293d49d0a..e1bb70059661d 100644 --- a/python/ray/data/dataset_pipeline.py +++ b/python/ray/data/dataset_pipeline.py @@ -169,6 +169,8 @@ def gen_rows() -> Iterator[Union[T, TableRow]]: def iter_batches( self, *, + prefetch_batches: int = 1, + # Deprecated. prefetch_blocks: int = 0, batch_size: Optional[int] = 256, batch_format: Optional[str] = "default", @@ -1071,7 +1073,7 @@ def iter_tf_batches( """Call :py:meth:`Dataset.iter_tf_batches ` over the stream of output batches from the pipeline.""" - return Dataset.iter_tf_batches( + return DataIterator.iter_tf_batches( self, prefetch_blocks=prefetch_blocks, batch_size=batch_size, @@ -1097,7 +1099,7 @@ def iter_torch_batches( """Call :py:meth:`Dataset.iter_torch_batches ` over the stream of output batches from the pipeline.""" - return Dataset.iter_torch_batches( + return DataIterator.iter_torch_batches( self, prefetch_blocks=prefetch_blocks, batch_size=batch_size, @@ -1122,7 +1124,7 @@ def to_tf( ) -> "tf.data.Dataset": """Call :py:meth:`Dataset.to_tf ` over the stream of output batches from the pipeline""" - return Dataset.to_tf( + return DataIterator.to_tf( self, feature_columns=feature_columns, label_columns=label_columns, @@ -1152,7 +1154,7 @@ def to_torch( ) -> "torch.utils.data.IterableDataset": """Call :py:meth:`Dataset.to_torch ` over the stream of output batches from the pipeline""" - return Dataset.to_torch( + return DataIterator.to_torch( self, label_column=label_column, feature_columns=feature_columns, diff --git a/python/ray/data/tests/test_dataset_pipeline.py b/python/ray/data/tests/test_dataset_pipeline.py index 393aa47b3bee8..644512dcd9628 100644 --- a/python/ray/data/tests/test_dataset_pipeline.py +++ b/python/ray/data/tests/test_dataset_pipeline.py @@ -394,9 +394,31 @@ def test_iter_batches_basic(ray_start_regular_shared): def test_to_torch(ray_start_regular_shared): - pipe = ray.data.range(10, parallelism=10).window(blocks_per_window=2) + pipe = ray.data.range(10, parallelism=10).window(blocks_per_window=2).repeat(2) batches = list(pipe.to_torch(batch_size=None)) - assert len(batches) == 10 + assert len(batches) == 20 + + +def test_to_tf(ray_start_regular_shared): + ds = ray.data.range_tensor(10, shape=(1, 1, 1), parallelism=10) + ds = ds.add_column("label", lambda x: 1) + pipe = ds.window(blocks_per_window=2).repeat(2) + batches = list( + pipe.to_tf(feature_columns="__value__", label_columns="label", batch_size=None) + ) + assert len(batches) == 20 + + +def test_iter_torch_batches(ray_start_regular_shared): + pipe = ray.data.range(10).repeat(2) + batches = list(pipe.iter_torch_batches(batch_size=1)) + assert len(batches) == 20 + + +def test_iter_tf_batches(ray_start_regular_shared): + pipe = ray.data.range(10).repeat(2) + batches = list(pipe.iter_tf_batches(batch_size=1)) + assert len(batches) == 20 def test_iter_batches_batch_across_windows(ray_start_regular_shared): diff --git a/release/release_tests.yaml b/release/release_tests.yaml index 5caf5597dcf9a..8c4efd187cbd7 100644 --- a/release/release_tests.yaml +++ b/release/release_tests.yaml @@ -2713,6 +2713,19 @@ alert: default + variations: + - __suffix__: aws + - __suffix__: gce + env: gce + frequency: manual + smoke_test: + frequency: manual + run: + timeout: 7200 + cluster: + cluster_env: app_config.yaml + cluster_compute: compute_tpl_single_node_gce.yaml + # - name: serve_serve_micro_benchmark_k8s # group: Serve tests # working_dir: serve_tests @@ -2752,6 +2765,19 @@ alert: default stable: False + variations: + - __suffix__: aws + - __suffix__: gce + env: gce + frequency: manual + smoke_test: + frequency: manual + run: + timeout: 3600 + cluster: + cluster_env: app_config.yaml + cluster_compute: compute_tpl_single_node_32_cpu_gce.yaml + - name: deployment_graph_wide_ensemble group: Serve tests working_dir: serve_tests @@ -2771,6 +2797,19 @@ alert: default stable: False + variations: + - __suffix__: aws + - __suffix__: gce + env: gce + frequency: manual + smoke_test: + frequency: manual + run: + timeout: 3600 + cluster: + cluster_env: app_config.yaml + cluster_compute: compute_tpl_single_node_32_cpu_gce.yaml + - name: serve_handle_long_chain group: Serve tests working_dir: serve_tests @@ -2790,6 +2829,19 @@ alert: default stable: False + variations: + - __suffix__: aws + - __suffix__: gce + env: gce + frequency: manual + smoke_test: + frequency: manual + run: + timeout: 3600 + cluster: + cluster_env: app_config.yaml + cluster_compute: compute_tpl_single_node_32_cpu_gce.yaml + - name: serve_handle_wide_ensemble group: Serve tests working_dir: serve_tests @@ -2809,6 +2861,19 @@ alert: default stable: False + variations: + - __suffix__: aws + - __suffix__: gce + env: gce + frequency: manual + smoke_test: + frequency: manual + run: + timeout: 3600 + cluster: + cluster_env: app_config.yaml + cluster_compute: compute_tpl_single_node_32_cpu_gce.yaml + - name: serve_micro_protocol_grpc_benchmark group: Serve tests @@ -2828,6 +2893,19 @@ alert: default + variations: + - __suffix__: aws + - __suffix__: gce + env: gce + frequency: manual + smoke_test: + frequency: manual + run: + timeout: 7200 + cluster: + cluster_env: app_config.yaml + cluster_compute: compute_tpl_single_node_gce.yaml + - name: serve_micro_protocol_http_benchmark group: Serve tests working_dir: serve_tests @@ -2846,6 +2924,19 @@ alert: default + variations: + - __suffix__: aws + - __suffix__: gce + env: gce + frequency: manual + smoke_test: + frequency: manual + run: + timeout: 7200 + cluster: + cluster_env: app_config.yaml + cluster_compute: compute_tpl_single_node_gce.yaml + - name: serve_resnet_benchmark group: Serve tests @@ -2865,6 +2956,19 @@ alert: default + variations: + - __suffix__: aws + - __suffix__: gce + env: gce + frequency: manual + smoke_test: + frequency: manual + run: + timeout: 7200 + cluster: + cluster_env: gpu_app_config.yaml + cluster_compute: compute_tpl_gpu_node_gce.yaml + ######################## # Train tests ######################## diff --git a/release/serve_tests/compute_tpl_32_cpu_gce.yaml b/release/serve_tests/compute_tpl_32_cpu_gce.yaml index e4cc0e474f598..26a2474f3af33 100644 --- a/release/serve_tests/compute_tpl_32_cpu_gce.yaml +++ b/release/serve_tests/compute_tpl_32_cpu_gce.yaml @@ -11,7 +11,7 @@ head_node_type: worker_node_types: - name: worker_node - instance_type: n2-standard-32 # m5.4xlarge + instance_type: n2-standard-32 # m5.8xlarge min_workers: 32 max_workers: 32 use_spot: false \ No newline at end of file diff --git a/release/serve_tests/compute_tpl_gpu_node_gce.yaml b/release/serve_tests/compute_tpl_gpu_node_gce.yaml new file mode 100644 index 0000000000000..9341c01b88992 --- /dev/null +++ b/release/serve_tests/compute_tpl_gpu_node_gce.yaml @@ -0,0 +1,17 @@ +cloud_id: {{env["ANYSCALE_CLOUD_ID"]}} +region: us-west1 +allowed_azs: + - us-west1-b + +max_workers: 32 + +head_node_type: + name: head_node + instance_type: n1-standard-16-nvidia-tesla-t4-1 # g4dn.4xlarge + +worker_node_types: + - name: worker_node + instance_type: n2-standard-16 # m5.4xlarge + min_workers: 0 + max_workers: 1 + use_spot: false \ No newline at end of file diff --git a/release/serve_tests/compute_tpl_single_node_32_cpu_gce.yaml b/release/serve_tests/compute_tpl_single_node_32_cpu_gce.yaml new file mode 100644 index 0000000000000..c3171d81fac9c --- /dev/null +++ b/release/serve_tests/compute_tpl_single_node_32_cpu_gce.yaml @@ -0,0 +1,17 @@ +cloud_id: {{env["ANYSCALE_CLOUD_ID"]}} +region: us-west1 +allowed_azs: + - us-west1-c + +max_workers: 0 + +head_node_type: + name: head_node + instance_type: n2-standard-32 # m5.8xlarge + +worker_node_types: + - name: worker_node + instance_type: n2-standard-32 # m5.8xlarge + min_workers: 0 + max_workers: 0 + use_spot: false \ No newline at end of file diff --git a/release/serve_tests/compute_tpl_single_node_gce.yaml b/release/serve_tests/compute_tpl_single_node_gce.yaml new file mode 100644 index 0000000000000..7729adaa517b7 --- /dev/null +++ b/release/serve_tests/compute_tpl_single_node_gce.yaml @@ -0,0 +1,17 @@ +cloud_id: {{env["ANYSCALE_CLOUD_ID"]}} +region: us-west1 +allowed_azs: + - us-west1-c + +max_workers: 0 + +head_node_type: + name: head_node + instance_type: n2-standard-16 # m5.4xlarge + +worker_node_types: + - name: worker_node + instance_type: n2-standard-16 # m5.4xlarge + min_workers: 0 + max_workers: 0 + use_spot: false \ No newline at end of file