Skip to content

Commit

Permalink
fix(signed url): pass version id down to fsspec via path
Browse files Browse the repository at this point in the history
  • Loading branch information
shcheklein committed Dec 28, 2024
1 parent 1bd7f8b commit 4333400
Show file tree
Hide file tree
Showing 3 changed files with 40 additions and 3 deletions.
10 changes: 8 additions & 2 deletions src/datachain/catalog/catalog.py
Original file line number Diff line number Diff line change
Expand Up @@ -1236,10 +1236,16 @@ def ls_dataset_rows(

return q.to_db_records()

def signed_url(self, source: str, path: str, client_config=None) -> str:
def signed_url(
self,
source: str,
path: str,
version_id: Optional[str] = None,
client_config=None,
) -> str:
client_config = client_config or self.client_config
client = Client.get_client(source, self.cache, **client_config)
return client.url(path)
return client.url(path, version_id=version_id)

Check warning on line 1248 in src/datachain/catalog/catalog.py

View check run for this annotation

Codecov / codecov/patch

src/datachain/catalog/catalog.py#L1248

Added line #L1248 was not covered by tests

def export_dataset_table(
self,
Expand Down
6 changes: 5 additions & 1 deletion src/datachain/client/fsspec.py
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,11 @@ def fs(self) -> "AbstractFileSystem":
return self._fs

def url(self, path: str, expires: int = 3600, **kwargs) -> str:
return self.fs.sign(self.get_full_path(path), expiration=expires, **kwargs)
return self.fs.sign(

Check warning on line 205 in src/datachain/client/fsspec.py

View check run for this annotation

Codecov / codecov/patch

src/datachain/client/fsspec.py#L205

Added line #L205 was not covered by tests
self.get_full_path(path, kwargs.pop("version_id", None)),
expiration=expires,
**kwargs,
)

async def get_current_etag(self, file: "File") -> str:
kwargs = {}
Expand Down
27 changes: 27 additions & 0 deletions src/datachain/client/s3.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import asyncio
from typing import Any, Optional, cast
from urllib.parse import parse_qs, urlencode, urlsplit, urlunsplit

from botocore.exceptions import NoCredentialsError
from s3fs import S3FileSystem
Expand Down Expand Up @@ -121,6 +122,32 @@ def _entry_from_boto(self, v, bucket, versions=False) -> File:
size=v["Size"],
)

@classmethod
def _split_version(cls, path: str) -> tuple[str, Optional[str]]:
parts = list(urlsplit(path))
query = parse_qs(parts[3])
if "versionId" in query:
version_id = query["versionId"][0]
del query["versionId"]
parts[3] = urlencode(query)

Check warning on line 132 in src/datachain/client/s3.py

View check run for this annotation

Codecov / codecov/patch

src/datachain/client/s3.py#L130-L132

Added lines #L130 - L132 were not covered by tests
else:
version_id = None
return urlunsplit(parts), version_id

@classmethod
def _join_version(cls, path: str, version_id: Optional[str]) -> str:
parts = list(urlsplit(path))
query = parse_qs(parts[3])
if "versionId" in query:
raise ValueError("path already includes a version query")

Check warning on line 142 in src/datachain/client/s3.py

View check run for this annotation

Codecov / codecov/patch

src/datachain/client/s3.py#L142

Added line #L142 was not covered by tests
parts[3] = f"versionId={version_id}" if version_id else ""
return urlunsplit(parts)

@classmethod
def version_path(cls, path: str, version_id: Optional[str]) -> str:
path, _ = cls._split_version(path)
return cls._join_version(path, version_id)

async def _fetch_dir(
self,
prefix,
Expand Down

0 comments on commit 4333400

Please sign in to comment.