Skip to content

Commit

Permalink
feat: support zstd compression in miniostorage
Browse files Browse the repository at this point in the history
we want to use zstd compression when compressing files for storage in
object storage because it performs better than gzip which is what we
were using before

these changes are only being made to the minio storage service because
we want to consolidate the storage service functionality into this one
so both worker and API will be using this backend in the future (API was
already using this one)

we have to manually decompress the zstd compressed files in read_file
but HTTPResponse takes care of it for us if the content encoding of the
file is gzip

the is_already_gzipped argument is being deprecated in favour of
compression_type and is_compressed, also the ability to pass a str to
write_file is being deprecated. we're keeping track of the use of these
using sentry capture_message
  • Loading branch information
joseph-sentry committed Nov 26, 2024
1 parent 12361de commit 1ad7fa7
Show file tree
Hide file tree
Showing 5 changed files with 465 additions and 263 deletions.
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ dependencies = [
"requests>=2.32.3",
"sentry-sdk>=2.18.0",
"sqlalchemy<2",
"zstandard==0.23.0",
]

[build-system]
Expand Down
179 changes: 128 additions & 51 deletions shared/storage/minio.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
import gzip
import datetime
import json
import logging
import os
import shutil
import sys
import tempfile
from io import BytesIO
from typing import BinaryIO, overload
from typing import BinaryIO, Protocol, overload

import sentry_sdk
import sentry_sdk.scope
import zstandard
from minio import Minio
from minio.credentials import (
ChainedProvider,
Expand All @@ -17,13 +19,29 @@
)
from minio.deleteobjects import DeleteObject
from minio.error import MinioException, S3Error
from urllib3.response import HTTPResponse

from shared.storage.base import CHUNK_SIZE, BaseStorageService
from shared.storage.base import BaseStorageService
from shared.storage.exceptions import BucketAlreadyExistsError, FileNotInStorageError

log = logging.getLogger(__name__)


class Readable(Protocol):
def read(self, size: int = -1) -> bytes: ...


class GetObjectToFileResponse(Protocol):
bucket_name: str
object_name: str
last_modified: datetime.datetime | None
etag: str
size: int
content_type: str | None
metadata: dict[str, str]
version_id: str | None


# Service class for interfacing with codecov's underlying storage layer, minio
class MinioStorageService(BaseStorageService):
def __init__(self, minio_config):
Expand Down Expand Up @@ -57,20 +75,21 @@ def init_minio_client(
region: str = None,
):
"""
Initialize the minio client
Initialize the minio client
`iam_auth` adds support for IAM base authentication in a fallback pattern.
The following will be checked in order:
The following will be checked in order:
* EC2 metadata -- a custom endpoint can be provided, default is None.
* AWS env vars, specifically AWS_ACCESS_KEY and AWS_SECRECT_KEY
* Minio env vars, specifically MINIO_ACCESS_KEY and MINIO_SECRET_KEY
* AWS env vars, specifically AWS_ACCESS_KEY and AWS_SECRECT_KEY
to support backward compatibility, the iam_auth setting should be used in the installation
configuration
to support backward compatibility, the iam_auth setting should be used
in the installation configuration
Args:
host (str): The address of the host where minio lives
port (str): The port number (as str or int should be ok)
access_key (str, optional): The access key (optional if IAM is being used)
secret_key (str, optional): The secret key (optional if IAM is being used)
Expand Down Expand Up @@ -143,50 +162,64 @@ def create_root_storage(self, bucket_name="archive", region="us-east-1"):
# Writes a file to storage will gzip if not compressed already
def write_file(
self,
bucket_name,
path,
data,
reduced_redundancy=False,
bucket_name: str,
path: str,
data: BinaryIO,
reduced_redundancy: bool = False,
*,
is_already_gzipped: bool = False,
is_already_gzipped: bool = False, # deprecated
is_compressed: bool = False,
compression_type: str = "zstd",
):
if is_already_gzipped:
log.warning(
"is_already_gzipped is deprecated and will be removed in a future version, instead compress using zstd and use the is_already_zstd_compressed argument"
)
with sentry_sdk.new_scope() as scope:
scope.set_extra("bucket_name", bucket_name)
scope.set_extra("path", path)
sentry_sdk.capture_message("is_already_gzipped passed with True")
is_compressed = True
compression_type = "gzip"

if isinstance(data, str):
data = data.encode()
log.warning(
"passing data as a str to write_file is deprecated and will be removed in a future version, instead pass an object compliant with the BinaryIO type"
)
with sentry_sdk.new_scope() as scope:
scope.set_extra("bucket_name", bucket_name)
scope.set_extra("path", path)
sentry_sdk.capture_message("write_file data argument passed as str")

if isinstance(data, bytes):
if not is_already_gzipped:
out = BytesIO()
with gzip.GzipFile(fileobj=out, mode="w", compresslevel=9) as gz:
gz.write(data)
else:
out = BytesIO(data)

# get file size
out.seek(0, os.SEEK_END)
out_size = out.tell()
else:
# data is already a file-like object
if not is_already_gzipped:
_, filename = tempfile.mkstemp()
with gzip.open(filename, "wb") as f:
shutil.copyfileobj(data, f)
out = open(filename, "rb")
else:
out = data
data = BytesIO(data.encode())

out_size = os.stat(filename).st_size
if not is_compressed:
cctx = zstandard.ZstdCompressor()
reader: zstandard.ZstdCompressionReader = cctx.stream_reader(data)
_, filepath = tempfile.mkstemp()
with open(filepath, "wb") as f:
while chunk := reader.read(16384):
f.write(chunk)
data = open(filepath, "rb")

try:
# reset pos for minio reading.
out.seek(0)
out_size = data.seek(0, os.SEEK_END)
data.seek(0)

if compression_type == "gzip":
content_encoding = "gzip"
elif compression_type == "zstd":
content_encoding = "zstd"

headers = {"Content-Encoding": content_encoding}

headers = {"Content-Encoding": "gzip"}
if reduced_redundancy:
headers["x-amz-storage-class"] = "REDUCED_REDUNDANCY"

self.minio_client.put_object(
bucket_name,
path,
out,
data,
out_size,
metadata=headers,
content_type="text/plain",
Expand All @@ -195,25 +228,65 @@ def write_file(

except MinioException:
raise
finally:
if not is_compressed:
data.close()
os.unlink(filepath)

@overload
def read_file(self, bucket_name: str, path: str) -> bytes: ...
def read_file(
self, bucket_name: str, path: str, file_obj: None = None
) -> bytes: ...

@overload
def read_file(self, bucket_name: str, path: str, file_obj: BinaryIO) -> None: ...
def read_file(self, bucket_name: str, path: str, file_obj: str) -> None: ...

def read_file(self, bucket_name, path, file_obj=None) -> bytes | None:
try:
res = self.minio_client.get_object(bucket_name, path)
if file_obj is None:
data = BytesIO()
for d in res.stream(CHUNK_SIZE):
data.write(d)
data.seek(0)
return data.getvalue()
headers = {"Accept-Encoding": "gzip, zstd"}
if file_obj:
_, tmpfilepath = tempfile.mkstemp()
to_file_response: GetObjectToFileResponse = (
self.minio_client.fget_object(
bucket_name, path, tmpfilepath, request_headers=headers
)
)
data = open(tmpfilepath, "rb")
content_encoding = to_file_response.metadata.get(
"Content-Encoding", None
)
else:
response: HTTPResponse = self.minio_client.get_object(
bucket_name, path, request_headers=headers
)
data = response
content_encoding = response.headers.get("Content-Encoding", None)

reader: Readable | None = None
if content_encoding == "gzip":
# HTTPResponse automatically decodes gzipped data for us
# minio_client.fget_object uses HTTPResponse under the hood,
# so this applies to both get_object and fget_object
reader = data
elif content_encoding == "zstd":
# we have to manually decompress zstandard compressed data
cctx = zstandard.ZstdDecompressor()
reader = cctx.stream_reader(data)
else:
with sentry_sdk.new_scope() as scope:
scope.set_extra("bucket_name", bucket_name)
scope.set_extra("path", path)
raise ValueError("Blob does not have Content-Encoding set")

if file_obj:
while chunk := reader.read(16384):
file_obj.write(chunk)
return None
else:
for d in res.stream(CHUNK_SIZE):
file_obj.write(d)
res = BytesIO()
while chunk := reader.read(16384):
res.write(chunk)
return res.getvalue()
except S3Error as e:
if e.code == "NoSuchKey":
raise FileNotInStorageError(
Expand All @@ -222,6 +295,10 @@ def read_file(self, bucket_name, path, file_obj=None) -> bytes | None:
raise e
except MinioException:
raise
finally:
if file_obj:
data.close()
os.unlink(tmpfilepath)

"""
Deletes file url in specified bucket.
Expand Down
Empty file added tests/unit/storage/__init__.py
Empty file.
Loading

0 comments on commit 1ad7fa7

Please sign in to comment.