Skip to content

Commit

Permalink
Avoid bucket metadata requests talking to GCS (#347)
Browse files Browse the repository at this point in the history
The GCS Storage would previously fetch the bucket metadata using `get_bucket`, where a call to `bucket` is sufficient to create the python class without doing a request.
  • Loading branch information
Swatinem authored Sep 16, 2024
1 parent d26f241 commit 8650a3c
Show file tree
Hide file tree
Showing 20 changed files with 43 additions and 4,326 deletions.
2 changes: 1 addition & 1 deletion shared/storage/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ def delete_file(self, bucket_name, path):
"""
raise NotImplementedError()

def delete_files(self, bucket_name, paths=[]):
def delete_files(self, bucket_name: str, paths: list[str]) -> list[bool]:
"""Batch deletes a list of files from a given bucket
(what happens to the files that don't exist?)
Expand Down
61 changes: 28 additions & 33 deletions shared/storage/gcp.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import gzip
import logging
from typing import IO

import google.cloud.exceptions
from google.cloud import storage
Expand All @@ -26,8 +27,8 @@ def load_credentials(self, gcp_config):
return Credentials.from_service_account_info(gcp_config)

def get_blob(self, bucket_name, path):
bucket = self.storage_client.get_bucket(bucket_name)
return storage.Blob(path, bucket)
bucket = self.storage_client.bucket(bucket_name)
return bucket.blob(path)

def create_root_storage(self, bucket_name="archive", region="us-east-1"):
"""
Expand All @@ -48,29 +49,27 @@ def create_root_storage(self, bucket_name="archive", region="us-east-1"):

def write_file(
self,
bucket_name,
path,
data,
bucket_name: str,
path: str,
data: str | bytes | IO,
reduced_redundancy=False,
*,
is_already_gzipped: bool = False,
):
"""
Writes a new file with the contents of `data`
(What happens if the file already exists?)
Writes a new file with the contents of `data`
(What happens if the file already exists?)
Args:
bucket_name (str): The name of the bucket for the file to be created on
path (str): The desired path of the file
data (str): The data to be written to the file
data: The data to be written to the file
reduced_redundancy (bool): Whether a reduced redundancy mode should be used (default: {False})
is_already_gzipped (bool): Whether the file is already gzipped (default: {False})
Raises:
NotImplementedError: If the current instance did not implement this method
"""
blob = self.get_blob(bucket_name, path)

if isinstance(data, str):
data = data.encode()
if isinstance(data, bytes):
Expand All @@ -84,21 +83,23 @@ def write_file(
blob.upload_from_file(data)
return True

def read_file(self, bucket_name, path, file_obj=None, *, retry=0):
def read_file(
self, bucket_name: str, path: str, file_obj: IO | None = None, retry=0
) -> bytes:
"""Reads the content of a file
Args:
bucket_name (str): The name of the bucket for the file lives
path (str): The path of the file
Raises:
NotImplementedError: If the current instance did not implement this method
FileNotInStorageError: If the file does not exist
Returns:
bytes : The contents of that file, still encoded as bytes
bytes: The contents of that file, still encoded as bytes
"""
blob = self.get_blob(bucket_name, path)

try:
blob.reload()
if (
Expand All @@ -108,10 +109,12 @@ def read_file(self, bucket_name, path, file_obj=None, *, retry=0):
blob.content_type = "text/plain"
blob.content_encoding = "gzip"
blob.patch()

if file_obj is None:
return blob.download_as_bytes(checksum="crc32c")
else:
blob.download_to_file(file_obj, checksum="crc32c")
return b"" # NOTE: this is to satisfy the return type
except google.cloud.exceptions.NotFound:
raise FileNotInStorageError(f"File {path} does not exist in {bucket_name}")
except google.resumable_media.common.DataCorruption:
Expand All @@ -120,19 +123,18 @@ def read_file(self, bucket_name, path, file_obj=None, *, retry=0):
return self.read_file(bucket_name, path, file_obj, retry=1)
raise

def delete_file(self, bucket_name, path):
def delete_file(self, bucket_name: str, path: str) -> bool:
"""Deletes a single file from the storage (what happens if the file doesnt exist?)
Args:
bucket_name (str): The name of the bucket for the file lives
path (str): The path of the file to be deleted
Raises:
NotImplementedError: If the current instance did not implement this method
FileNotInStorageError: If the file does not exist
Returns:
bool: True if the deletion was succesful
bool: True if the deletion was successful
"""
blob = self.get_blob(bucket_name, path)
try:
Expand All @@ -141,28 +143,25 @@ def delete_file(self, bucket_name, path):
raise FileNotInStorageError(f"File {path} does not exist in {bucket_name}")
return True

def delete_files(self, bucket_name, paths=[]):
def delete_files(self, bucket_name: str, paths: list[str]) -> list[bool]:
"""Batch deletes a list of files from a given bucket
(what happens to the files that don't exist?)
Args:
bucket_name (str): The name of the bucket for the file lives
paths (list): A list of the paths to be deletes (default: {[]})
Raises:
NotImplementedError: If the current instance did not implement this method
Returns:
list: A list of booleans, where each result indicates whether that file was deleted
successfully
"""
bucket = self.storage_client.get_bucket(bucket_name)
blobs = [self.get_blob(bucket_name, path) for path in paths]
blobs_errored = set()
bucket = self.storage_client.bucket(bucket_name)
blobs = [bucket.blob(path) for path in paths]
blobs_errored: set[storage.Blob] = set()
bucket.delete_blobs(blobs, on_error=blobs_errored.add)
return [b not in blobs_errored for b in blobs]

def list_folder_contents(self, bucket_name, prefix=None, recursive=True):
def list_folder_contents(self, bucket_name: str, prefix=None, recursive=True):
"""List the contents of a specific folder
Attention: google ignores the `recursive` param
Expand All @@ -171,13 +170,9 @@ def list_folder_contents(self, bucket_name, prefix=None, recursive=True):
bucket_name (str): The name of the bucket for the file lives
prefix: The prefix of the files to be listed (default: {None})
recursive: Whether the listing should be recursive (default: {True})
Raises:
NotImplementedError: If the current instance did not implement this method
"""
assert recursive
bucket = self.storage_client.get_bucket(bucket_name)
return (self._blob_to_dict(b) for b in bucket.list_blobs(prefix=prefix))

def _blob_to_dict(self, blob):
return {"name": blob.name, "size": blob.size}
bucket = self.storage_client.bucket(bucket_name)
return (
{"name": b.name, "size": b.size} for b in bucket.list_blobs(prefix=prefix)
)

This file was deleted.

Loading

0 comments on commit 8650a3c

Please sign in to comment.