Skip to content

Commit

Permalink
Add simple upload compression support. (#183)
Browse files Browse the repository at this point in the history
Add simple upload compression support. Refactor compression flags for easier configuration during initialization.
  • Loading branch information
mooman219 authored and houglum committed Nov 7, 2017
1 parent b9bdee9 commit 57873d8
Show file tree
Hide file tree
Showing 5 changed files with 213 additions and 44 deletions.
9 changes: 6 additions & 3 deletions apitools/base/py/compression.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@


# pylint: disable=invalid-name
def CompressStream(in_stream, length, compresslevel=2, chunksize=16777216):
def CompressStream(in_stream, length=None, compresslevel=2,
chunksize=16777216):

"""Compresses an input stream into a file-like buffer.
Expand All @@ -43,7 +44,9 @@ def CompressStream(in_stream, length, compresslevel=2, chunksize=16777216):
smaller than expected. Because data is written to the output
buffer in increments of the chunksize, the output buffer may be
larger than length by chunksize. Very uncompressible data can
exceed this further if gzip inflates the underlying data.
exceed this further if gzip inflates the underlying data. If
length is none, the input stream will be compressed until
it's exhausted.
compresslevel: Optional, defaults to 2. The desired compression level.
chunksize: Optional, defaults to 16MiB. The chunk size used when
reading data from the input stream to write into the output
Expand All @@ -61,7 +64,7 @@ def CompressStream(in_stream, length, compresslevel=2, chunksize=16777216):
fileobj=out_stream,
compresslevel=compresslevel) as compress_stream:
# Read until we've written at least length bytes to the output stream.
while out_stream.length < length:
while not length or out_stream.length < length:
data = in_stream.read(chunksize)
data_length = len(data)
compress_stream.write(data)
Expand Down
16 changes: 16 additions & 0 deletions apitools/base/py/compression_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,22 @@ def testCompressionExhausted(self):
# Ensure the input stream was exhausted.
self.assertTrue(exhausted)

def testCompressionUnbounded(self):
"""Test unbounded compression.
Test that the input stream is exhausted when length is none.
"""
output, read, exhausted = compression.CompressStream(
self.stream,
None,
9)
# Ensure the compressed buffer is smaller than the input buffer.
self.assertLess(output.length, self.length)
# Ensure we read the entire input stream.
self.assertEqual(read, self.length)
# Ensure the input stream was exhausted.
self.assertTrue(exhausted)

def testCompressionPartial(self):
"""Test partial compression.
Expand Down
2 changes: 1 addition & 1 deletion apitools/base/py/gzip.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ def write32u(output, value):
output.write(struct.pack("<L", value))


class _PaddedFile:
class _PaddedFile(object):
"""Minimal read-only file object that prepends a string to the contents
of an actual file. Shouldn't be used outside of gzip.py, as it lacks
essential functionality."""
Expand Down
48 changes: 33 additions & 15 deletions apitools/base/py/transfer.py
Original file line number Diff line number Diff line change
Expand Up @@ -549,7 +549,7 @@ class Upload(_Transfer):
def __init__(self, stream, mime_type, total_size=None, http=None,
close_stream=False, chunksize=None, auto_transfer=True,
progress_callback=None, finish_callback=None,
**kwds):
gzip_encoded=False, **kwds):
super(Upload, self).__init__(
stream, close_stream=close_stream, chunksize=chunksize,
auto_transfer=auto_transfer, http=http, **kwds)
Expand All @@ -560,6 +560,7 @@ def __init__(self, stream, mime_type, total_size=None, http=None,
self.__server_chunk_granularity = None
self.__strategy = None
self.__total_size = None
self.__gzip_encoded = gzip_encoded

self.progress_callback = progress_callback
self.finish_callback = finish_callback
Expand All @@ -570,7 +571,8 @@ def progress(self):
return self.__progress

@classmethod
def FromFile(cls, filename, mime_type=None, auto_transfer=True, **kwds):
def FromFile(cls, filename, mime_type=None, auto_transfer=True,
gzip_encoded=False, **kwds):
"""Create a new Upload object from a filename."""
path = os.path.expanduser(filename)
if not os.path.exists(path):
Expand All @@ -582,20 +584,23 @@ def FromFile(cls, filename, mime_type=None, auto_transfer=True, **kwds):
'Could not guess mime type for %s' % path)
size = os.stat(path).st_size
return cls(open(path, 'rb'), mime_type, total_size=size,
close_stream=True, auto_transfer=auto_transfer, **kwds)
close_stream=True, auto_transfer=auto_transfer,
gzip_encoded=gzip_encoded, **kwds)

@classmethod
def FromStream(cls, stream, mime_type, total_size=None, auto_transfer=True,
**kwds):
gzip_encoded=False, **kwds):
"""Create a new Upload object from a stream."""
if mime_type is None:
raise exceptions.InvalidUserInputError(
'No mime_type specified for stream')
return cls(stream, mime_type, total_size=total_size,
close_stream=False, auto_transfer=auto_transfer, **kwds)
close_stream=False, auto_transfer=auto_transfer,
gzip_encoded=gzip_encoded, **kwds)

@classmethod
def FromData(cls, stream, json_data, http, auto_transfer=None, **kwds):
def FromData(cls, stream, json_data, http, auto_transfer=None,
gzip_encoded=False, **kwds):
"""Create a new Upload of stream from serialized json_data and http."""
info = json.loads(json_data)
missing_keys = cls._REQUIRED_SERIALIZATION_KEYS - set(info.keys())
Expand All @@ -607,7 +612,8 @@ def FromData(cls, stream, json_data, http, auto_transfer=None, **kwds):
raise exceptions.InvalidUserInputError(
'Cannot override total_size on serialized Upload')
upload = cls.FromStream(stream, info['mime_type'],
total_size=info.get('total_size'), **kwds)
total_size=info.get('total_size'),
gzip_encoded=gzip_encoded, **kwds)
if isinstance(stream, io.IOBase) and not stream.seekable():
raise exceptions.InvalidUserInputError(
'Cannot restart resumable upload on non-seekable stream')
Expand Down Expand Up @@ -724,6 +730,17 @@ def ConfigureRequest(self, upload_config, http_request, url_builder):
else:
url_builder.query_params['uploadType'] = 'media'
self.__ConfigureMediaRequest(http_request)
# Once the entire body is written, compress the body if configured
# to. Both multipart and media request uploads will read the
# entire stream into memory, which means full compression is also
# safe to perform. Because the strategy is set to SIMPLE_UPLOAD,
# StreamInChunks throws an exception, meaning double compression
# cannot happen.
if self.__gzip_encoded:
http_request.headers['Content-Encoding'] = 'gzip'
body_buffer = six.BytesIO(http_request.body)
body, _, _ = compression.CompressStream(body_buffer)
http_request.body = body
else:
url_builder.relative_path = upload_config.resumable_path
url_builder.query_params['uploadType'] = 'resumable'
Expand Down Expand Up @@ -866,8 +883,7 @@ def __ValidateChunksize(self, chunksize=None):
self.__server_chunk_granularity)

def __StreamMedia(self, callback=None, finish_callback=None,
additional_headers=None, use_chunks=True,
compressed=False):
additional_headers=None, use_chunks=True):
"""Helper function for StreamMedia / StreamInChunks."""
if self.strategy != RESUMABLE_UPLOAD:
raise exceptions.InvalidUserInputError(
Expand All @@ -879,14 +895,16 @@ def __StreamMedia(self, callback=None, finish_callback=None,

def CallSendChunk(start):
return self.__SendChunk(
start, additional_headers=additional_headers,
compressed=compressed)
start, additional_headers=additional_headers)

def CallSendMediaBody(start):
return self.__SendMediaBody(
start, additional_headers=additional_headers)

send_func = CallSendChunk if use_chunks else CallSendMediaBody
if not use_chunks and self.__gzip_encoded:
raise exceptions.InvalidUserInputError(
'Cannot gzip encode non-chunked upload')
if use_chunks:
self.__ValidateChunksize(self.chunksize)
self.EnsureInitialized()
Expand Down Expand Up @@ -934,11 +952,11 @@ def StreamMedia(self, callback=None, finish_callback=None,
additional_headers=additional_headers, use_chunks=False)

def StreamInChunks(self, callback=None, finish_callback=None,
additional_headers=None, compressed=False):
additional_headers=None):
"""Send this (resumable) upload in chunks."""
return self.__StreamMedia(
callback=callback, finish_callback=finish_callback,
additional_headers=additional_headers, compressed=compressed)
additional_headers=additional_headers)

def __SendMediaRequest(self, request, end):
"""Request helper function for SendMediaBody & SendChunk."""
Expand Down Expand Up @@ -983,12 +1001,12 @@ def __SendMediaBody(self, start, additional_headers=None):

return self.__SendMediaRequest(request, self.total_size)

def __SendChunk(self, start, additional_headers=None, compressed=False):
def __SendChunk(self, start, additional_headers=None):
"""Send the specified chunk."""
self.EnsureInitialized()
no_log_body = self.total_size is None
request = http_wrapper.Request(url=self.url, http_method='PUT')
if compressed:
if self.__gzip_encoded:
request.headers['Content-Encoding'] = 'gzip'
body_stream, read_length, exhausted = compression.CompressStream(
self.stream, self.chunksize)
Expand Down
Loading

0 comments on commit 57873d8

Please sign in to comment.