Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use bigger fixture tree for distributed tests #4

Draft
wants to merge 11 commits into
base: main
Choose a base branch
from
13 changes: 8 additions & 5 deletions tests/func/test_dataset_query.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
from tests.data import ENTRIES
from tests.utils import (
DEFAULT_TREE,
LARGE_TREE,
NUM_TREE,
SIMPLE_DS_QUERY_RECORDS,
TARRED_TREE,
Expand Down Expand Up @@ -1012,8 +1013,8 @@ def name_len_interrupt(_name):


@pytest.mark.parametrize(
"cloud_type,version_aware",
[("s3", True)],
"cloud_type,version_aware,tree",
[("s3", True, LARGE_TREE)],
indirect=True,
)
@pytest.mark.parametrize("batch", [False, True])
Expand All @@ -1023,7 +1024,9 @@ def name_len_interrupt(_name):
reason="Set the DATACHAIN_DISTRIBUTED environment variable "
"to test distributed UDFs",
)
def test_udf_distributed(cloud_test_catalog_tmpfile, batch, workers, datachain_job_id):
def test_udf_distributed(
cloud_test_catalog_tmpfile, batch, workers, tree, datachain_job_id
):
catalog = cloud_test_catalog_tmpfile.catalog
sources = [cloud_test_catalog_tmpfile.src_uri]
globs = [s.rstrip("/") + "/*" for s in sources]
Expand All @@ -1048,8 +1051,8 @@ def name_len_batch(names):

q = (
DatasetQuery(name="animals", version=1, catalog=catalog)
.filter(C.size < 13)
.filter(C.parent.glob("cats*") | (C.size < 4))
.filter(C.size < 90)
.filter(C.parent.glob("cats*") | (C.size > 30))
.add_signals(udf_func, parallel=2, workers=workers)
.select(C.name, C.name_len, C.blank)
)
Expand Down
12 changes: 12 additions & 0 deletions tests/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,18 @@ def make_index(catalog, src: str, entries, ttl: int = 1234):
"others": {"dog4": "ruff"},
},
}

# Need to run in a distributed mode to at least have a decent amount of tasks
# Has the same structure as the DEFAULT_TREE - cats and dogs
LARGE_TREE: dict[str, Any] = {
"description": "Cats and Dogs",
"cats": {f"cat{i}": "a" * i for i in range(1, 128)},
"dogs": {
**{f"dogs{i}": "a" * i for i in range(1, 64)},
"others": {f"dogs{i}": "a" * i for i in range(64, 98)},
},
}

NUM_TREE = {f"{i:06d}": f"{i}" for i in range(1024)}


Expand Down
Loading