-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
1 parent
cf1ecf4
commit 9c57ee9
Showing
4 changed files
with
219 additions
and
2 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,105 @@ | ||
"""Access S3 service.""" | ||
|
||
from collections.abc import Iterator | ||
from contextlib import contextmanager | ||
from dataclasses import dataclass, field | ||
from typing import TYPE_CHECKING | ||
|
||
from pytest_moto_fixtures.utils import NoArgs, randstr | ||
|
||
if TYPE_CHECKING: | ||
from mypy_boto3_s3 import S3Client | ||
from mypy_boto3_s3.type_defs import BlobTypeDef, GetObjectOutputTypeDef, ObjectTypeDef | ||
|
||
|
||
@dataclass(kw_only=True, frozen=True) | ||
class S3Bucket: | ||
"""Bucket in S3 service.""" | ||
|
||
client: 'S3Client' = field(repr=False) | ||
"""S3 Client.""" | ||
name: str | ||
"""Bucket name.""" | ||
|
||
def __len__(self) -> int: | ||
"""Number of objects in bucket. | ||
Returns: | ||
Number of objects in bucket. | ||
""" | ||
response = self.client.list_objects_v2(Bucket=self.name) | ||
size = response['KeyCount'] | ||
while response['IsTruncated']: | ||
response = self.client.list_objects_v2( | ||
Bucket=self.name, ContinuationToken=response['NextContinuationToken'] | ||
) | ||
size += response['KeyCount'] | ||
return size | ||
|
||
def __getitem__(self, key: str, /) -> 'GetObjectOutputTypeDef': | ||
"""Get object in bucket. | ||
Args: | ||
key: Key of object. | ||
Returns: | ||
Object in bucket. | ||
""" | ||
return self.client.get_object(Bucket=self.name, Key=key) | ||
|
||
def __setitem__(self, key: str, value: 'BlobTypeDef', /) -> None: | ||
"""Put object in bucket. | ||
Args: | ||
key: Key of object. | ||
value: Content of object. | ||
""" | ||
self.client.put_object(Bucket=self.name, Key=key, Body=value) | ||
|
||
def __delitem__(self, key: str, /) -> None: | ||
"""Delete object in bucket. | ||
Args: | ||
key: Key of object. | ||
""" | ||
self.client.delete_object(Bucket=self.name, Key=key) | ||
|
||
def __iter__(self) -> Iterator['ObjectTypeDef']: | ||
"""Iterates over objects in bucket. | ||
Returns: | ||
Iterator over objects. | ||
""" | ||
response = self.client.list_objects_v2(Bucket=self.name) | ||
yield from response.get('Contents', []) | ||
while response['IsTruncated']: | ||
response = self.client.list_objects_v2( | ||
Bucket=self.name, ContinuationToken=response['NextContinuationToken'] | ||
) | ||
yield from response.get('Contents', []) | ||
|
||
def prune(self) -> None: | ||
"""Prune objects in bucket.""" | ||
for obj in self: | ||
del self[obj['Key']] | ||
|
||
|
||
@contextmanager | ||
def s3_create_bucket(*, s3_client: 'S3Client', name: str | NoArgs = NoArgs.NO_ARG) -> Iterator[S3Bucket]: | ||
"""Context for creating an S3 bucket and removing it on exit. | ||
Args: | ||
s3_client: S3 client where bucket will be created. | ||
name: Name of bucket to be created. If it is ``None`` a random name will be used. | ||
Return: | ||
Bucket created in S3 service. | ||
""" | ||
if isinstance(name, NoArgs): | ||
name = randstr() | ||
|
||
s3_client.create_bucket(Bucket=name) | ||
yield S3Bucket(client=s3_client, name=name) | ||
for bucket_object in s3_client.list_objects_v2(Bucket=name).get('Contents', []): | ||
s3_client.delete_object(Bucket=name, Key=bucket_object['Key']) | ||
s3_client.delete_bucket(Bucket=name) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,97 @@ | ||
from random import randint | ||
from typing import TYPE_CHECKING | ||
|
||
import pytest | ||
from botocore.errorfactory import ClientError | ||
|
||
from pytest_moto_fixtures.services.s3 import S3Bucket, s3_create_bucket | ||
from pytest_moto_fixtures.utils import randstr | ||
|
||
if TYPE_CHECKING: | ||
from mypy_boto3_s3 import S3Client | ||
|
||
|
||
class TestS3Bucket: | ||
def test_attributes(self, s3_client: 'S3Client') -> None: | ||
name = randstr() | ||
|
||
sut = S3Bucket(client=s3_client, name=name) | ||
|
||
assert sut.client == s3_client | ||
assert sut.name == name | ||
|
||
def test_len(self, s3_bucket: 'S3Bucket') -> None: | ||
files = [randstr() for _ in range(randint(3, 10))] | ||
|
||
for expected, filename in enumerate(files, start=1): | ||
s3_bucket.client.put_object(Bucket=s3_bucket.name, Key=filename, Body=b'') | ||
|
||
assert len(s3_bucket) == expected | ||
|
||
for expected, filename in zip(reversed(range(len(files))), files, strict=True): | ||
s3_bucket.client.delete_object(Bucket=s3_bucket.name, Key=filename) | ||
|
||
assert len(s3_bucket) == expected | ||
|
||
def test_getitem(self, s3_bucket: 'S3Bucket') -> None: | ||
files = [(randstr(), randstr().encode()) for _ in range(randint(3, 10))] | ||
|
||
for filename, content in files: | ||
s3_bucket.client.put_object(Bucket=s3_bucket.name, Key=filename, Body=content) | ||
|
||
received = s3_bucket[filename] | ||
|
||
assert received['Body'].read() == content | ||
|
||
def test_setitem(self, s3_bucket: 'S3Bucket') -> None: | ||
files = [(randstr(), randstr().encode()) for _ in range(randint(3, 10))] | ||
|
||
for filename, content in files: | ||
s3_bucket[filename] = content | ||
|
||
assert s3_bucket.client.get_object(Bucket=s3_bucket.name, Key=filename)['Body'].read() == content | ||
|
||
def test_delitem(self, s3_bucket: 'S3Bucket') -> None: | ||
files = [randstr() for _ in range(randint(3, 10))] | ||
|
||
for filename in files: | ||
s3_bucket.client.put_object(Bucket=s3_bucket.name, Key=filename, Body='') | ||
|
||
del s3_bucket[filename] | ||
|
||
with pytest.raises(ClientError): | ||
s3_bucket.client.get_object(Bucket=s3_bucket.name, Key=filename) | ||
|
||
def test_iter(self, s3_bucket: 'S3Bucket') -> None: | ||
files = {randstr() for _ in range(randint(3, 10))} | ||
|
||
for filename in files: | ||
s3_bucket.client.put_object(Bucket=s3_bucket.name, Key=filename, Body='') | ||
|
||
assert {obj['Key'] for obj in s3_bucket} == files | ||
|
||
def test_prune(self, s3_bucket: 'S3Bucket') -> None: | ||
files = [randstr() for _ in range(randint(3, 10))] | ||
|
||
for filename in files: | ||
s3_bucket.client.put_object(Bucket=s3_bucket.name, Key=filename, Body='') | ||
|
||
s3_bucket.prune() | ||
|
||
assert 'Content' not in s3_bucket.client.list_objects_v2(Bucket=s3_bucket.name) | ||
|
||
|
||
class TestS3CreateBucket: | ||
def test_default_args(self, s3_client: 'S3Client') -> None: | ||
with s3_create_bucket(s3_client=s3_client) as sut: | ||
result = s3_client.list_buckets() | ||
assert sut.name in [bucket['Name'] for bucket in result['Buckets']] | ||
|
||
result = s3_client.list_buckets() | ||
assert sut.name not in [bucket['Name'] for bucket in result['Buckets']] | ||
|
||
def test_name_arg(self, s3_client: 'S3Client') -> None: | ||
name = randstr() | ||
|
||
with s3_create_bucket(s3_client=s3_client, name=name) as sut: | ||
assert sut.name == name |