-
Notifications
You must be signed in to change notification settings - Fork 112
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
7 changed files
with
1,471 additions
and
11 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,135 @@ | ||
#!/usr/bin/env python | ||
# | ||
# Copyright 2017 Google Inc. | ||
# | ||
# Licensed under the Apache License, Version 2.0 (the "License"); | ||
# you may not use this file except in compliance with the License. | ||
# You may obtain a copy of the License at | ||
# | ||
# http://www.apache.org/licenses/LICENSE-2.0 | ||
# | ||
# Unless required by applicable law or agreed to in writing, software | ||
# distributed under the License is distributed on an "AS IS" BASIS, | ||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
# See the License for the specific language governing permissions and | ||
# limitations under the License. | ||
|
||
"""Compression support for apitools.""" | ||
|
||
from collections import deque | ||
|
||
from apitools.base.py import gzip | ||
|
||
__all__ = [ | ||
'CompressStream', | ||
] | ||
|
||
|
||
# pylint: disable=invalid-name | ||
def CompressStream(in_stream, length, compresslevel=2, chunksize=16777216): | ||
|
||
"""Compresses an input stream into a file-like buffer. | ||
This reads from the input stream until either we've stored at least length | ||
compressed bytes, or the input stream has been exhausted. | ||
This supports streams of unknown size. | ||
Args: | ||
in_stream: The input stream to read from. | ||
length: The target number of compressed bytes to buffer in the output | ||
stream. The actual length of the output buffer can vary from this | ||
length. If the input stream is exhaused, the output buffer may be | ||
smaller than expected. Because data is written to the output | ||
buffer in increments of the chunksize, the output buffer may be | ||
larger than length by chunksize. Very uncompressible data can | ||
exceed this further if gzip inflates the underlying data. | ||
compresslevel: Optional, defaults to 2. The desired compression level. | ||
chunksize: Optional, defaults to 16MiB. The chunk size used when | ||
reading data from the input stream to write into the output | ||
buffer. | ||
Returns: | ||
A file-like output buffer of compressed bytes, the number of bytes read | ||
from the input stream, and a flag denoting if the input stream was | ||
exhausted. | ||
""" | ||
in_read = 0 | ||
in_exhausted = False | ||
out_stream = StreamingBuffer() | ||
with gzip.GzipFile(mode='wb', | ||
fileobj=out_stream, | ||
compresslevel=compresslevel) as compress_stream: | ||
# Read until we've written at least length bytes to the output stream. | ||
while out_stream.length < length: | ||
data = in_stream.read(chunksize) | ||
data_length = len(data) | ||
compress_stream.write(data) | ||
in_read += data_length | ||
# If we read less than requested, the stream is exhausted. | ||
if data_length < chunksize: | ||
in_exhausted = True | ||
break | ||
return out_stream, in_read, in_exhausted | ||
|
||
|
||
class StreamingBuffer(object): | ||
|
||
"""Provides a file-like object that writes to a temporary buffer. | ||
When data is read from the buffer, it is permanently removed. This is | ||
useful when there are memory constraints preventing the entire buffer from | ||
being stored in memory. | ||
""" | ||
|
||
def __init__(self): | ||
# The buffer of byte arrays. | ||
self.__buf = deque() | ||
# The number of bytes in __buf. | ||
self.__size = 0 | ||
|
||
def __len__(self): | ||
return self.__size | ||
|
||
def __nonzero__(self): | ||
# For 32-bit python2.x, len() cannot exceed a 32-bit number; avoid | ||
# accidental len() calls from httplib in the form of "if this_object:". | ||
return bool(self.__size) | ||
|
||
@property | ||
def length(self): | ||
# For 32-bit python2.x, len() cannot exceed a 32-bit number. | ||
return self.__size | ||
|
||
def write(self, data): | ||
# Gzip can write many 0 byte chunks for highly compressible data. | ||
# Prevent them from being added internally. | ||
if data is not None and data: | ||
self.__buf.append(data) | ||
self.__size += len(data) | ||
|
||
def read(self, size=None): | ||
"""Read at most size bytes from this buffer. | ||
Bytes read from this buffer are consumed and are permanently removed. | ||
Args: | ||
size: If provided, read no more than size bytes from the buffer. | ||
Otherwise, this reads the entire buffer. | ||
Returns: | ||
The bytes read from this buffer. | ||
""" | ||
if size is None: | ||
size = self.__size | ||
ret_list = [] | ||
while size > 0 and self.__buf: | ||
data = self.__buf.popleft() | ||
size -= len(data) | ||
ret_list.append(data) | ||
if size < 0: | ||
ret_list[-1], remainder = ret_list[-1][:size], ret_list[-1][size:] | ||
self.__buf.appendleft(remainder) | ||
ret = b''.join(ret_list) | ||
self.__size -= len(ret) | ||
return ret |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,133 @@ | ||
#!/usr/bin/env python | ||
# | ||
# Copyright 2017 Google Inc. | ||
# | ||
# Licensed under the Apache License, Version 2.0 (the "License"); | ||
# you may not use this file except in compliance with the License. | ||
# You may obtain a copy of the License at | ||
# | ||
# http://www.apache.org/licenses/LICENSE-2.0 | ||
# | ||
# Unless required by applicable law or agreed to in writing, software | ||
# distributed under the License is distributed on an "AS IS" BASIS, | ||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
# See the License for the specific language governing permissions and | ||
# limitations under the License. | ||
|
||
"""Tests for compression.""" | ||
|
||
from apitools.base.py import compression | ||
from apitools.base.py import gzip | ||
|
||
import six | ||
import unittest2 | ||
|
||
|
||
class CompressionTest(unittest2.TestCase): | ||
|
||
def setUp(self): | ||
# Sample highly compressible data (~50MB). | ||
self.sample_data = b'abc' * 16777216 | ||
# Stream of the sample data. | ||
self.stream = six.BytesIO() | ||
self.stream.write(self.sample_data) | ||
self.length = self.stream.tell() | ||
self.stream.seek(0) | ||
|
||
def testCompressionExhausted(self): | ||
"""Test full compression. | ||
Test that highly compressible data is actually compressed in entirety. | ||
""" | ||
output, read, exhausted = compression.CompressStream( | ||
self.stream, | ||
self.length, | ||
9) | ||
# Ensure the compressed buffer is smaller than the input buffer. | ||
self.assertLess(output.length, self.length) | ||
# Ensure we read the entire input stream. | ||
self.assertEqual(read, self.length) | ||
# Ensure the input stream was exhausted. | ||
self.assertTrue(exhausted) | ||
|
||
def testCompressionPartial(self): | ||
"""Test partial compression. | ||
Test that the length parameter works correctly. The amount of data | ||
that's compressed can be greater than or equal to the requested length. | ||
""" | ||
output_length = 40 | ||
output, _, exhausted = compression.CompressStream( | ||
self.stream, | ||
output_length, | ||
9) | ||
# Ensure the requested read size is <= the compressed buffer size. | ||
self.assertLessEqual(output_length, output.length) | ||
# Ensure the input stream was not exhausted. | ||
self.assertFalse(exhausted) | ||
|
||
def testCompressionIntegrity(self): | ||
"""Test that compressed data can be decompressed.""" | ||
output, read, exhausted = compression.CompressStream( | ||
self.stream, | ||
self.length, | ||
9) | ||
# Ensure uncompressed data matches the sample data. | ||
with gzip.GzipFile(fileobj=output) as f: | ||
original = f.read() | ||
self.assertEqual(original, self.sample_data) | ||
# Ensure we read the entire input stream. | ||
self.assertEqual(read, self.length) | ||
# Ensure the input stream was exhausted. | ||
self.assertTrue(exhausted) | ||
|
||
|
||
class StreamingBufferTest(unittest2.TestCase): | ||
|
||
def setUp(self): | ||
self.stream = compression.StreamingBuffer() | ||
|
||
def testSimpleStream(self): | ||
"""Test simple stream operations. | ||
Test that the stream can be written to and read from. Also test that | ||
reading from the stream consumes the bytes. | ||
""" | ||
# Ensure the stream is empty. | ||
self.assertEqual(self.stream.length, 0) | ||
# Ensure data is correctly written. | ||
self.stream.write(b'Sample data') | ||
self.assertEqual(self.stream.length, 11) | ||
# Ensure data can be read and the read data is purged from the stream. | ||
data = self.stream.read(11) | ||
self.assertEqual(data, b'Sample data') | ||
self.assertEqual(self.stream.length, 0) | ||
|
||
def testPartialReads(self): | ||
"""Test partial stream reads. | ||
Test that the stream can be read in chunks while perserving the | ||
consumption mechanics. | ||
""" | ||
self.stream.write(b'Sample data') | ||
# Ensure data can be read and the read data is purged from the stream. | ||
data = self.stream.read(6) | ||
self.assertEqual(data, b'Sample') | ||
self.assertEqual(self.stream.length, 5) | ||
# Ensure the remaining data can be read. | ||
data = self.stream.read(5) | ||
self.assertEqual(data, b' data') | ||
self.assertEqual(self.stream.length, 0) | ||
|
||
def testTooShort(self): | ||
"""Test excessive stream reads. | ||
Test that more data can be requested from the stream than available | ||
without raising an exception. | ||
""" | ||
self.stream.write(b'Sample') | ||
# Ensure requesting more data than available does not raise an | ||
# exception. | ||
data = self.stream.read(100) | ||
self.assertEqual(data, b'Sample') | ||
self.assertEqual(self.stream.length, 0) |
Oops, something went wrong.