diff --git a/dvc/dependency/repo.py b/dvc/dependency/repo.py index 5fa8d0e7e4..417f088afe 100644 --- a/dvc/dependency/repo.py +++ b/dvc/dependency/repo.py @@ -98,12 +98,13 @@ def download(self, to: "Output", jobs: Optional[int] = None): files = super().download(to=to, jobs=jobs) if not isinstance(to.fs, LocalFileSystem): - return files + return hashes: list[tuple[str, HashInfo, dict[str, Any]]] = [] - for src_path, dest_path in files: + for src_path, dest_path, *rest in files: try: - hash_info = self.fs.info(src_path)["dvc_info"]["entry"].hash_info + info = rest[0] if rest else self.fs.info(src_path) + hash_info = info["dvc_info"]["entry"].hash_info dest_info = to.fs.info(dest_path) except (KeyError, AttributeError): # If no hash info found, just keep going and output will be hashed later @@ -112,7 +113,6 @@ def download(self, to: "Output", jobs: Optional[int] = None): hashes.append((dest_path, hash_info, dest_info)) cache = to.cache if to.use_cache else to.local_cache cache.state.save_many(hashes, to.fs) - return files def update(self, rev: Optional[str] = None): if rev: diff --git a/dvc/fs/__init__.py b/dvc/fs/__init__.py index 0c9cf567ac..4b739428c6 100644 --- a/dvc/fs/__init__.py +++ b/dvc/fs/__init__.py @@ -1,5 +1,5 @@ import glob -from typing import Optional +from typing import Optional, Union from urllib.parse import urlparse from dvc.config import ConfigError as RepoConfigError @@ -47,12 +47,24 @@ def download( fs: "FileSystem", fs_path: str, to: str, jobs: Optional[int] = None -) -> list[tuple[str, str]]: +) -> list[Union[tuple[str, str], tuple[str, str, dict]]]: from dvc.scm import lfs_prefetch from .callbacks import TqdmCallback with TqdmCallback(desc=f"Downloading {fs.name(fs_path)}", unit="files") as cb: + if isinstance(fs, DVCFileSystem): + lfs_prefetch( + fs, + [ + f"{fs.normpath(glob.escape(fs_path))}/**" + if fs.isdir(fs_path) + else glob.escape(fs_path) + ], + ) + if not glob.has_magic(fs_path): + return fs._get(fs_path, to, batch_size=jobs, callback=cb) + # NOTE: We use dvc-objects generic.copy over fs.get since it makes file # download atomic and avoids fsspec glob/regex path expansion. if fs.isdir(fs_path): @@ -69,15 +81,6 @@ def download( from_infos = [fs_path] to_infos = [to] - if isinstance(fs, DVCFileSystem): - lfs_prefetch( - fs, - [ - f"{fs.normpath(glob.escape(fs_path))}/**" - if fs.isdir(fs_path) - else glob.escape(fs_path) - ], - ) cb.set_size(len(from_infos)) jobs = jobs or fs.jobs generic.copy(fs, from_infos, localfs, to_infos, callback=cb, batch_size=jobs) diff --git a/dvc/fs/dvc.py b/dvc/fs/dvc.py index 19e4c04654..e1a74faee9 100644 --- a/dvc/fs/dvc.py +++ b/dvc/fs/dvc.py @@ -6,13 +6,15 @@ import threading from collections import deque from contextlib import ExitStack, suppress +from glob import has_magic from typing import TYPE_CHECKING, Any, Callable, Optional, Union -from fsspec.spec import AbstractFileSystem +from fsspec.spec import DEFAULT_CALLBACK, AbstractFileSystem from funcy import wrap_with from dvc.log import logger -from dvc_objects.fs.base import FileSystem +from dvc.utils.threadpool import ThreadPoolExecutor +from dvc_objects.fs.base import AnyFSPath, FileSystem from .data import DataFileSystem @@ -20,6 +22,8 @@ from dvc.repo import Repo from dvc.types import DictStrAny, StrPath + from .callbacks import Callback + logger = logger.getChild(__name__) RepoFactory = Union[Callable[..., "Repo"], type["Repo"]] @@ -474,9 +478,110 @@ def _info( # noqa: C901 info["name"] = path return info + def get( + self, + rpath, + lpath, + recursive=False, + callback=DEFAULT_CALLBACK, + maxdepth=None, + batch_size=None, + **kwargs, + ): + self._get( + rpath, + lpath, + recursive=recursive, + callback=callback, + maxdepth=maxdepth, + batch_size=batch_size, + **kwargs, + ) + + def _get( # noqa: C901 + self, + rpath, + lpath, + recursive=False, + callback=DEFAULT_CALLBACK, + maxdepth=None, + batch_size=None, + **kwargs, + ) -> list[Union[tuple[str, str], tuple[str, str, dict]]]: + if ( + isinstance(rpath, list) + or isinstance(lpath, list) + or has_magic(rpath) + or not self.exists(rpath) + or not recursive + ): + super().get( + rpath, + lpath, + recursive=recursive, + callback=callback, + maxdepth=maxdepth, + **kwargs, + ) + return [] + + if os.path.isdir(lpath) or lpath.endswith(os.path.sep): + lpath = self.join(lpath, os.path.basename(rpath)) + + if self.isfile(rpath): + with callback.branched(rpath, lpath) as child: + self.get_file(rpath, lpath, callback=child, **kwargs) + return [(rpath, lpath)] + + _files = [] + _dirs: list[str] = [] + for root, dirs, files in self.walk(rpath, maxdepth=maxdepth, detail=True): + if files: + callback.set_size((callback.size or 0) + len(files)) + + parts = self.relparts(root, rpath) + if parts in ((os.curdir,), ("",)): + parts = () + dest_root = os.path.join(lpath, *parts) + if not maxdepth or len(parts) < maxdepth - 1: + _dirs.extend(f"{dest_root}{os.path.sep}{d}" for d in dirs) + + key = self._get_key_from_relative(root) + _, dvc_fs, _ = self._get_subrepo_info(key) + + for name, info in files.items(): + src_path = f"{root}{self.sep}{name}" + dest_path = f"{dest_root}{os.path.sep}{name}" + _files.append((dvc_fs, src_path, dest_path, info)) + + os.makedirs(lpath, exist_ok=True) + for d in _dirs: + os.mkdir(d) + + def _get_file(arg): + dvc_fs, src, dest, info = arg + dvc_info = info.get("dvc_info") + if dvc_info and dvc_fs: + dvc_path = dvc_info["name"] + dvc_fs.get_file( + dvc_path, dest, callback=callback, info=dvc_info, **kwargs + ) + else: + self.get_file(src, dest, callback=callback, **kwargs) + return src, dest, info + + with ThreadPoolExecutor(max_workers=batch_size) as executor: + return list(executor.imap_unordered(_get_file, _files)) + def get_file(self, rpath, lpath, **kwargs): key = self._get_key_from_relative(rpath) fs_path = self._from_key(key) + + dirpath = os.path.dirname(lpath) + if dirpath: + # makedirs raises error if the string is empty + os.makedirs(dirpath, exist_ok=True) + try: return self.repo.fs.get_file(fs_path, lpath, **kwargs) except FileNotFoundError: @@ -553,6 +658,45 @@ def immutable(self): def getcwd(self): return self.fs.getcwd() + def _get( + self, + from_info: Union[AnyFSPath, list[AnyFSPath]], + to_info: Union[AnyFSPath, list[AnyFSPath]], + callback: "Callback" = DEFAULT_CALLBACK, + recursive: bool = False, + batch_size: Optional[int] = None, + **kwargs, + ) -> list[Union[tuple[str, str], tuple[str, str, dict]]]: + # FileSystem.get is non-recursive by default if arguments are lists + # otherwise, it's recursive. + recursive = not (isinstance(from_info, list) and isinstance(to_info, list)) + return self.fs._get( + from_info, + to_info, + callback=callback, + recursive=recursive, + batch_size=batch_size, + **kwargs, + ) + + def get( + self, + from_info: Union[AnyFSPath, list[AnyFSPath]], + to_info: Union[AnyFSPath, list[AnyFSPath]], + callback: "Callback" = DEFAULT_CALLBACK, + recursive: bool = False, + batch_size: Optional[int] = None, + **kwargs, + ) -> None: + self._get( + from_info, + to_info, + callback=callback, + batch_size=batch_size, + recursive=recursive, + **kwargs, + ) + @property def fsid(self) -> str: return self.fs.fsid diff --git a/tests/func/test_import.py b/tests/func/test_import.py index df78e7a699..47fdd66ccc 100644 --- a/tests/func/test_import.py +++ b/tests/func/test_import.py @@ -13,7 +13,7 @@ from dvc.testing.tmp_dir import make_subrepo from dvc.utils.fs import remove from dvc_data.hashfile import hash -from dvc_data.index.index import DataIndexDirError +from dvc_data.index.index import DataIndex, DataIndexDirError def test_import(tmp_dir, scm, dvc, erepo_dir): @@ -725,12 +725,41 @@ def test_import_invalid_configs(tmp_dir, scm, dvc, erepo_dir): ) -def test_import_no_hash(tmp_dir, scm, dvc, erepo_dir, mocker): +@pytest.mark.parametrize( + "files,expected_info_calls", + [ + ({"foo": "foo"}, {("foo",)}), + ( + { + "dir": { + "bar": "bar", + "subdir": {"lorem": "ipsum", "nested": {"lorem": "lorem"}}, + } + }, + # info calls should be made for only directories + {("dir",), ("dir", "subdir"), ("dir", "subdir", "nested")}, + ), + ], +) +def test_import_no_hash( + tmp_dir, scm, dvc, erepo_dir, mocker, files, expected_info_calls +): with erepo_dir.chdir(): - erepo_dir.dvc_gen("foo", "foo content", commit="create foo") - - spy = mocker.spy(hash, "file_md5") - stage = dvc.imp(os.fspath(erepo_dir), "foo", "foo_imported") - assert spy.call_count == 1 - for call in spy.call_args_list: - assert stage.outs[0].fs_path != call.args[0] + erepo_dir.dvc_gen(files, commit="create foo") + + file_md5_spy = mocker.spy(hash, "file_md5") + index_info_spy = mocker.spy(DataIndex, "info") + name = next(iter(files)) + + dvc.imp(os.fspath(erepo_dir), name, "out") + + local_hashes = [ + call.args[0] + for call in file_md5_spy.call_args_list + if call.args[1].protocol == "local" + ] + # no files should be hashed, should use existing metadata + assert not local_hashes + assert { + call.args[1] for call in index_info_spy.call_args_list + } == expected_info_calls diff --git a/tests/unit/fs/test_dvcfs.py b/tests/unit/fs/test_dvcfs.py new file mode 100644 index 0000000000..3dbc2ea7c3 --- /dev/null +++ b/tests/unit/fs/test_dvcfs.py @@ -0,0 +1,805 @@ +import os +import posixpath +from hashlib import md5 +from itertools import product + +import pytest +from fsspec.implementations.local import LocalFileSystem, make_path_posix +from fsspec.tests.abstract.common import GLOB_EDGE_CASES_TESTS + +from dvc.api import DVCFileSystem + + +class DVCFixtures: + """The fixtures imitate the fsspec.tests.abstract.AbstractFixtures. + + This has been modified to use dvc's fixture, as DVCFileSystem is a read-only + filesystem, and cannot be used to create directories or files. + + The `Output.ignore()` is mocked to avoid `.gitignore` files in the directories, + as we can reuse the tests from fsspec with minimal modifications. + `.gitignore` file is manually created with required patterns at the root of the + repository. + """ + + @pytest.fixture + def fs_bulk_operations_scenario_0(self, tmp_dir): + """ + Scenario that is used for many cp/get/put tests. Creates the following + directory and file structure: + + 📁 source + ├── 📄 file1 + ├── 📄 file2 + └── 📁 subdir + ├── 📄 subfile1 + ├── 📄 subfile2 + └── 📁 nesteddir + └── 📄 nestedfile + """ + source = tmp_dir / "source" + source.mkdir() + tmp_dir.scm_gen( + ".gitignore", "/source/file2/\nsource/subdir", commit="add .gitignore" + ) + + tmp_dir.scm_gen("source/file1", "file1", commit="add file1") + tmp_dir.dvc_gen("source/file2", "file2", commit="add file2") + tmp_dir.dvc_gen( + { + "source/subdir": { + "subfile1": "subfile1", + "subfile2": "subfile2", + "nesteddir": {"nestedfile": "nestedfile"}, + } + }, + commit="add subdir", + ) + return "/source" + + @pytest.fixture + def fs_10_files_with_hashed_names(self, tmp_dir, local_fs, local_join, local_path): + """ + Scenario that is used to check cp/get/put files order when source and + destination are lists. Creates the following directory and file structure: + + 📁 source + └── 📄 {hashed([0-9])}.txt + """ + dir_contents = { + md5(str(i).encode("utf-8")).hexdigest() + ".txt": str(i) for i in range(10) + } + tmp_dir.dvc_gen({"source": dir_contents}, commit="add source") + tmp_dir.scm_gen(".gitignore", "/source", commit="add .gitignore") + return "/source" + + @pytest.fixture + def src_directory(self, tmp_dir): + # https://github.com/fsspec/filesystem_spec/issues/1062 + # Recursive cp/get/put of source directory into non-existent target directory. + tmp_dir.dvc_gen({"src": {"file": "file"}}, commit="add source") + return "/src" + + @pytest.fixture + def fs_dir_and_file_with_same_name_prefix(self, tmp_dir): + """ + Scenario that is used to check cp/get/put on directory and file with + the same name prefixes. Creates the following directory and file structure: + + 📁 source + ├── 📄 subdir.txt + └── 📁 subdir + └── 📄 subfile.txt + """ + source = tmp_dir / "source" + source.mkdir() + + tmp_dir.scm_gen(".gitignore", "/source/subdir", commit="add .gitignore") + tmp_dir.scm_gen("source/subdir.txt", "subdir.txt", commit="add subdir.txt") + tmp_dir.dvc_gen( + {"source/subdir": {"subfile.txt": "subfile.txt"}}, commit="add subdir" + ) + return "/source" + + @pytest.fixture + def fs_glob_edge_cases_files(self, tmp_dir): + """ + Scenario that is used for glob edge cases cp/get/put tests. + Creates the following directory and file structure: + + 📁 source + ├── 📄 file1 + ├── 📄 file2 + ├── 📁 subdir0 + │ ├── 📄 subfile1 + │ ├── 📄 subfile2 + │ └── 📁 nesteddir + │ └── 📄 nestedfile + └── 📁 subdir1 + ├── 📄 subfile1 + ├── 📄 subfile2 + └── 📁 nesteddir + └── 📄 nestedfile + """ + source = tmp_dir / "source" + source.mkdir() + + tmp_dir.scm_gen( + ".gitignore", "/source/file1\n/source/subdir1", commit="add .gitignore" + ) + tmp_dir.scm_gen("source/file1", "file1", commit="add file1") + tmp_dir.dvc_gen("source/file2", "file2", commit="add file2") + + dir_contents = { + "subfile1": "subfile1", + "subfile2": "subfile2", + "nesteddir": {"nestedfile": "nestedfile"}, + } + tmp_dir.scm_gen({"source/subdir0": dir_contents}, commit="add subdir0") + tmp_dir.dvc_gen({"source/subdir1": dir_contents}, commit="add subdir1") + return "/source" + + @pytest.fixture(params=[{"rev": "HEAD"}, {}]) + def fs(self, request, tmp_dir, dvc, scm): + return DVCFileSystem(tmp_dir, **request.param) + + @pytest.fixture(autouse=True) + def mock_ignore(self, mocker): + mocker.patch("dvc.output.Output.ignore") + + @pytest.fixture + def fs_join(self): + return posixpath.join + + @pytest.fixture + def fs_path(self, fs): + return fs.root_marker + + @pytest.fixture(scope="class") + def local_fs(self): + # Maybe need an option for auto_mkdir=False? This is only relevant + # for certain implementations. + return LocalFileSystem(auto_mkdir=True) + + @pytest.fixture + def local_join(self): + """ + Return a function that joins its arguments together into a path, on + the local filesystem. + """ + return os.path.join + + @pytest.fixture + def local_path(self, tmpdir): + return tmpdir + + @pytest.fixture + def local_target(self, local_fs, local_join, local_path): + """ + Return name of local directory that does not yet exist to copy into. + + Cleans up at the end of each test it which it is used. + """ + target = local_join(local_path, "target") + yield target + if local_fs.exists(target): + local_fs.rm(target, recursive=True) + + +class TestDVCFileSystemGet(DVCFixtures): + """ + This test is adapted from `fsspec.tests.abstract.get.AbstractGetTests` + with minor modifications to work with DVCFixtures and DVCFileSystem. + """ + + def test_get_file_to_existing_directory( + self, + fs, + fs_join, + fs_bulk_operations_scenario_0, + local_fs, + local_join, + local_target, + ): + # Copy scenario 1a + source = fs_bulk_operations_scenario_0 + + target = local_target + local_fs.mkdir(target) + assert local_fs.isdir(target) + + target_file2 = local_join(target, "file2") + target_subfile1 = local_join(target, "subfile1") + + # Copy from source directory + fs.get(fs_join(source, "file2"), target) + assert local_fs.isfile(target_file2) + + # Copy from sub directory + fs.get(fs_join(source, "subdir", "subfile1"), target) + assert local_fs.isfile(target_subfile1) + + # Remove copied files + local_fs.rm([target_file2, target_subfile1]) + assert not local_fs.exists(target_file2) + assert not local_fs.exists(target_subfile1) + + # Repeat with trailing slash on target + fs.get(fs_join(source, "file2"), target + "/") + assert local_fs.isdir(target) + assert local_fs.isfile(target_file2) + + fs.get(fs_join(source, "subdir", "subfile1"), target + "/") + assert local_fs.isfile(target_subfile1) + + def test_get_file_to_new_directory( + self, + fs, + fs_join, + fs_bulk_operations_scenario_0, + local_fs, + local_join, + local_target, + ): + # Copy scenario 1b + source = fs_bulk_operations_scenario_0 + + target = local_target + local_fs.mkdir(target) + + fs.get( + fs_join(source, "subdir", "subfile1"), local_join(target, "newdir/") + ) # Note trailing slash + + assert local_fs.isdir(target) + assert local_fs.isdir(local_join(target, "newdir")) + assert local_fs.isfile(local_join(target, "newdir", "subfile1")) + + def test_get_file_to_file_in_existing_directory( + self, + fs, + fs_join, + fs_bulk_operations_scenario_0, + local_fs, + local_join, + local_target, + ): + # Copy scenario 1c + source = fs_bulk_operations_scenario_0 + + target = local_target + local_fs.mkdir(target) + + fs.get(fs_join(source, "subdir", "subfile1"), local_join(target, "newfile")) + assert local_fs.isfile(local_join(target, "newfile")) + + def test_get_file_to_file_in_new_directory( + self, + fs, + fs_join, + fs_bulk_operations_scenario_0, + local_fs, + local_join, + local_target, + ): + # Copy scenario 1d + source = fs_bulk_operations_scenario_0 + + target = local_target + local_fs.mkdir(target) + + fs.get( + fs_join(source, "subdir", "subfile1"), + local_join(target, "newdir", "newfile"), + ) + assert local_fs.isdir(local_join(target, "newdir")) + assert local_fs.isfile(local_join(target, "newdir", "newfile")) + + def test_get_directory_to_existing_directory( + self, + fs, + fs_join, + fs_bulk_operations_scenario_0, + local_fs, + local_join, + local_target, + ): + # Copy scenario 1e + source = fs_bulk_operations_scenario_0 + + target = local_target + local_fs.mkdir(target) + assert local_fs.isdir(target) + + for source_slash, target_slash in zip([False, True], [False, True]): + s = fs_join(source, "subdir") + if source_slash: + s += "/" + t = target + "/" if target_slash else target + + # Without recursive does nothing + fs.get(s, t) + assert local_fs.ls(target) == [] + + # With recursive + fs.get(s, t, recursive=True) + if source_slash: + assert local_fs.isfile(local_join(target, "subfile1")) + assert local_fs.isfile(local_join(target, "subfile2")) + assert local_fs.isdir(local_join(target, "nesteddir")) + assert local_fs.isfile(local_join(target, "nesteddir", "nestedfile")) + assert not local_fs.exists(local_join(target, "subdir")) + + local_fs.rm( + [ + local_join(target, "subfile1"), + local_join(target, "subfile2"), + local_join(target, "nesteddir"), + ], + recursive=True, + ) + else: + assert local_fs.isdir(local_join(target, "subdir")) + assert local_fs.isfile(local_join(target, "subdir", "subfile1")) + assert local_fs.isfile(local_join(target, "subdir", "subfile2")) + assert local_fs.isdir(local_join(target, "subdir", "nesteddir")) + assert local_fs.isfile( + local_join(target, "subdir", "nesteddir", "nestedfile") + ) + + local_fs.rm(local_join(target, "subdir"), recursive=True) + assert local_fs.ls(target) == [] + + # Limit recursive by maxdepth + fs.get(s, t, recursive=True, maxdepth=1) + if source_slash: + assert local_fs.isfile(local_join(target, "subfile1")) + assert local_fs.isfile(local_join(target, "subfile2")) + assert not local_fs.exists(local_join(target, "nesteddir")) + assert not local_fs.exists(local_join(target, "subdir")) + + local_fs.rm( + [ + local_join(target, "subfile1"), + local_join(target, "subfile2"), + ], + recursive=True, + ) + else: + assert local_fs.isdir(local_join(target, "subdir")) + assert local_fs.isfile(local_join(target, "subdir", "subfile1")) + assert local_fs.isfile(local_join(target, "subdir", "subfile2")) + assert not local_fs.exists(local_join(target, "subdir", "nesteddir")) + + local_fs.rm(local_join(target, "subdir"), recursive=True) + assert local_fs.ls(target) == [] + + def test_get_directory_to_new_directory( + self, + fs, + fs_join, + fs_bulk_operations_scenario_0, + local_fs, + local_join, + local_target, + ): + # Copy scenario 1f + source = fs_bulk_operations_scenario_0 + + target = local_target + local_fs.mkdir(target) + + for source_slash, target_slash in zip([False, True], [False, True]): + s = fs_join(source, "subdir") + if source_slash: + s += "/" + t = local_join(target, "newdir") + if target_slash: + t += "/" + + # Without recursive does nothing + fs.get(s, t) + assert local_fs.ls(target) == [] + + # With recursive + fs.get(s, t, recursive=True) + assert local_fs.isdir(local_join(target, "newdir")) + assert local_fs.isfile(local_join(target, "newdir", "subfile1")) + assert local_fs.isfile(local_join(target, "newdir", "subfile2")) + assert local_fs.isdir(local_join(target, "newdir", "nesteddir")) + assert local_fs.isfile( + local_join(target, "newdir", "nesteddir", "nestedfile") + ) + assert not local_fs.exists(local_join(target, "subdir")) + + local_fs.rm(local_join(target, "newdir"), recursive=True) + assert local_fs.ls(target) == [] + + # Limit recursive by maxdepth + fs.get(s, t, recursive=True, maxdepth=1) + assert local_fs.isdir(local_join(target, "newdir")) + assert local_fs.isfile(local_join(target, "newdir", "subfile1")) + assert local_fs.isfile(local_join(target, "newdir", "subfile2")) + assert not local_fs.exists(local_join(target, "newdir", "nesteddir")) + assert not local_fs.exists(local_join(target, "subdir")) + + local_fs.rm(local_join(target, "newdir"), recursive=True) + assert not local_fs.exists(local_join(target, "newdir")) + + def test_get_glob_to_existing_directory( + self, + fs, + fs_join, + fs_bulk_operations_scenario_0, + local_fs, + local_join, + local_target, + ): + # Copy scenario 1g + source = fs_bulk_operations_scenario_0 + + target = local_target + local_fs.mkdir(target) + + for target_slash in [False, True]: + t = target + "/" if target_slash else target + + # Without recursive + fs.get(fs_join(source, "subdir", "*"), t) + assert local_fs.isfile(local_join(target, "subfile1")) + assert local_fs.isfile(local_join(target, "subfile2")) + assert not local_fs.isdir(local_join(target, "nesteddir")) + assert not local_fs.exists(local_join(target, "nesteddir", "nestedfile")) + assert not local_fs.exists(local_join(target, "subdir")) + + local_fs.rm( + [ + local_join(target, "subfile1"), + local_join(target, "subfile2"), + ], + recursive=True, + ) + assert local_fs.ls(target) == [] + + # With recursive + for glob, recursive in zip(["*", "**"], [True, False]): + fs.get(fs_join(source, "subdir", glob), t, recursive=recursive) + assert local_fs.isfile(local_join(target, "subfile1")) + assert local_fs.isfile(local_join(target, "subfile2")) + assert local_fs.isdir(local_join(target, "nesteddir")) + assert local_fs.isfile(local_join(target, "nesteddir", "nestedfile")) + assert not local_fs.exists(local_join(target, "subdir")) + + local_fs.rm( + [ + local_join(target, "subfile1"), + local_join(target, "subfile2"), + local_join(target, "nesteddir"), + ], + recursive=True, + ) + assert local_fs.ls(target) == [] + + # Limit recursive by maxdepth + fs.get( + fs_join(source, "subdir", glob), t, recursive=recursive, maxdepth=1 + ) + assert local_fs.isfile(local_join(target, "subfile1")) + assert local_fs.isfile(local_join(target, "subfile2")) + assert not local_fs.exists(local_join(target, "nesteddir")) + assert not local_fs.exists(local_join(target, "subdir")) + + local_fs.rm( + [ + local_join(target, "subfile1"), + local_join(target, "subfile2"), + ], + recursive=True, + ) + assert local_fs.ls(target) == [] + + def test_get_glob_to_new_directory( + self, + fs, + fs_join, + fs_bulk_operations_scenario_0, + local_fs, + local_join, + local_target, + ): + # Copy scenario 1h + source = fs_bulk_operations_scenario_0 + + target = local_target + local_fs.mkdir(target) + + for target_slash in [False, True]: + t = fs_join(target, "newdir") + if target_slash: + t += "/" + + # Without recursive + fs.get(fs_join(source, "subdir", "*"), t) + assert local_fs.isdir(local_join(target, "newdir")) + assert local_fs.isfile(local_join(target, "newdir", "subfile1")) + assert local_fs.isfile(local_join(target, "newdir", "subfile2")) + assert not local_fs.exists(local_join(target, "newdir", "nesteddir")) + assert not local_fs.exists( + local_join(target, "newdir", "nesteddir", "nestedfile") + ) + assert not local_fs.exists(local_join(target, "subdir")) + assert not local_fs.exists(local_join(target, "newdir", "subdir")) + + local_fs.rm(local_join(target, "newdir"), recursive=True) + assert local_fs.ls(target) == [] + + # With recursive + for glob, recursive in zip(["*", "**"], [True, False]): + fs.get(fs_join(source, "subdir", glob), t, recursive=recursive) + assert local_fs.isdir(local_join(target, "newdir")) + assert local_fs.isfile(local_join(target, "newdir", "subfile1")) + assert local_fs.isfile(local_join(target, "newdir", "subfile2")) + assert local_fs.isdir(local_join(target, "newdir", "nesteddir")) + assert local_fs.isfile( + local_join(target, "newdir", "nesteddir", "nestedfile") + ) + assert not local_fs.exists(local_join(target, "subdir")) + assert not local_fs.exists(local_join(target, "newdir", "subdir")) + + local_fs.rm(local_join(target, "newdir"), recursive=True) + assert not local_fs.exists(local_join(target, "newdir")) + + # Limit recursive by maxdepth + fs.get( + fs_join(source, "subdir", glob), t, recursive=recursive, maxdepth=1 + ) + assert local_fs.isdir(local_join(target, "newdir")) + assert local_fs.isfile(local_join(target, "newdir", "subfile1")) + assert local_fs.isfile(local_join(target, "newdir", "subfile2")) + assert not local_fs.exists(local_join(target, "newdir", "nesteddir")) + assert not local_fs.exists(local_join(target, "subdir")) + assert not local_fs.exists(local_join(target, "newdir", "subdir")) + + local_fs.rm(local_fs.ls(target, detail=False), recursive=True) + assert not local_fs.exists(local_join(target, "newdir")) + + @pytest.mark.parametrize( + GLOB_EDGE_CASES_TESTS["argnames"], + GLOB_EDGE_CASES_TESTS["argvalues"], + ) + def test_get_glob_edge_cases( + self, + path, + recursive, + maxdepth, + expected, + fs, + fs_join, + fs_glob_edge_cases_files, + local_fs, + local_join, + local_target, + ): + # Copy scenario 1g + source = fs_glob_edge_cases_files + + target = local_target + + for new_dir, target_slash in product([True, False], [True, False]): + local_fs.mkdir(target) + + t = local_join(target, "newdir") if new_dir else target + t = t + "/" if target_slash else t + + fs.get(fs_join(source, path), t, recursive=recursive, maxdepth=maxdepth) + + output = local_fs.find(target) + if new_dir: + prefixed_expected = [ + make_path_posix(local_join(target, "newdir", p)) for p in expected + ] + else: + prefixed_expected = [ + make_path_posix(local_join(target, p)) for p in expected + ] + assert sorted(output) == sorted(prefixed_expected) + + try: + local_fs.rm(target, recursive=True) + except FileNotFoundError: + pass + + def test_get_list_of_files_to_existing_directory( + self, + fs, + fs_join, + fs_bulk_operations_scenario_0, + local_fs, + local_join, + local_target, + ): + # Copy scenario 2a + source = fs_bulk_operations_scenario_0 + + target = local_target + local_fs.mkdir(target) + + source_files = [ + fs_join(source, "file1"), + fs_join(source, "file2"), + fs_join(source, "subdir", "subfile1"), + ] + + for target_slash in [False, True]: + t = target + "/" if target_slash else target + + fs.get(source_files, t) + assert local_fs.isfile(local_join(target, "file1")) + assert local_fs.isfile(local_join(target, "file2")) + assert local_fs.isfile(local_join(target, "subfile1")) + + local_fs.rm( + [ + local_join(target, "file1"), + local_join(target, "file2"), + local_join(target, "subfile1"), + ], + recursive=True, + ) + assert local_fs.ls(target) == [] + + def test_get_list_of_files_to_new_directory( + self, + fs, + fs_join, + fs_bulk_operations_scenario_0, + local_fs, + local_join, + local_target, + ): + # Copy scenario 2b + source = fs_bulk_operations_scenario_0 + + target = local_target + local_fs.mkdir(target) + + source_files = [ + fs_join(source, "file1"), + fs_join(source, "file2"), + fs_join(source, "subdir", "subfile1"), + ] + + fs.get(source_files, local_join(target, "newdir") + "/") # Note trailing slash + assert local_fs.isdir(local_join(target, "newdir")) + assert local_fs.isfile(local_join(target, "newdir", "file1")) + assert local_fs.isfile(local_join(target, "newdir", "file2")) + assert local_fs.isfile(local_join(target, "newdir", "subfile1")) + + def test_get_directory_recursive( + self, src_directory, fs, fs_join, fs_path, local_fs, local_join, local_target + ): + target = local_target + src = src_directory + + # get without slash + assert not local_fs.exists(target) + for loop in range(2): + fs.get(src, target, recursive=True) + assert local_fs.isdir(target) + + if loop == 0: + assert local_fs.isfile(local_join(target, "file")) + assert not local_fs.exists(local_join(target, "src")) + else: + assert local_fs.isfile(local_join(target, "file")) + assert local_fs.isdir(local_join(target, "src")) + assert local_fs.isfile(local_join(target, "src", "file")) + + local_fs.rm(target, recursive=True) + + # get with slash + assert not local_fs.exists(target) + for _ in range(2): + fs.get(src + "/", target, recursive=True) + assert local_fs.isdir(target) + assert local_fs.isfile(local_join(target, "file")) + assert not local_fs.exists(local_join(target, "src")) + + def test_get_directory_without_files_with_same_name_prefix( + self, + fs, + fs_join, + local_fs, + local_join, + local_target, + fs_dir_and_file_with_same_name_prefix, + ): + # Create the test dirs + source = fs_dir_and_file_with_same_name_prefix + target = local_target + + # Test without glob + fs.get(fs_join(source, "subdir"), target, recursive=True) + + assert local_fs.isfile(local_join(target, "subfile.txt")) + assert not local_fs.isfile(local_join(target, "subdir.txt")) + + local_fs.rm([local_join(target, "subfile.txt")]) + assert local_fs.ls(target) == [] + + # Test with glob + fs.get(fs_join(source, "subdir*"), target, recursive=True) + + assert local_fs.isdir(local_join(target, "subdir")) + assert local_fs.isfile(local_join(target, "subdir", "subfile.txt")) + assert local_fs.isfile(local_join(target, "subdir.txt")) + + def test_get_with_source_and_destination_as_list( + self, + fs, + fs_join, + local_fs, + local_join, + local_target, + fs_10_files_with_hashed_names, + ): + # Create the test dir + source = fs_10_files_with_hashed_names + target = local_target + + # Create list of files for source and destination + source_files = [] + destination_files = [] + for i in range(10): + hashed_i = md5(str(i).encode("utf-8")).hexdigest() + source_files.append(fs_join(source, f"{hashed_i}.txt")) + destination_files.append( + make_path_posix(local_join(target, f"{hashed_i}.txt")) + ) + + # Copy and assert order was kept + fs.get(rpath=source_files, lpath=destination_files) + + for i in range(10): + file_content = local_fs.cat(destination_files[i]).decode("utf-8") + assert file_content == str(i) + + +def test_maxdepth(tmp_dir, dvc, scm): + tmp_dir.dvc_gen( + { + "dir": { + "file1": "file1", + "subdir": { + "file2": "file2", + "subdir2": {"file3": "file3", "subdir3": {"file4": "file4"}}, + }, + } + }, + commit="add dir", + ) + + fs = DVCFileSystem(url=tmp_dir) + fs.get("dir", "dir1", recursive=True, maxdepth=1) + assert (tmp_dir / "dir1").read_text() == {"file1": "file1"} + + fs.get("dir", "dir2", recursive=True, maxdepth=2) + assert (tmp_dir / "dir2").read_text() == { + "file1": "file1", + "subdir": {"file2": "file2"}, + } + + fs.get("dir", "dir3", recursive=True, maxdepth=3) + assert (tmp_dir / "dir3").read_text() == { + "file1": "file1", + "subdir": {"file2": "file2", "subdir2": {"file3": "file3"}}, + } + + fs.get("dir", "dir4", recursive=True, maxdepth=4) + assert (tmp_dir / "dir4").read_text() == { + "file1": "file1", + "subdir": { + "file2": "file2", + "subdir2": {"file3": "file3", "subdir3": {"file4": "file4"}}, + }, + }