Skip to content

Commit

Permalink
Merge pull request #182 from mooman219/feature-compression
Browse files Browse the repository at this point in the history
Add compression support for streams

Adds functionality necessary to compress streamed data on the fly, allowing for sending data in a compressed form (e.g. using the "Content-Encoding" HTTP header).  Applies compression only for chunked streams, compressing each chunk as it is streamed in.
  • Loading branch information
houglum committed Nov 1, 2017
2 parents 1c640d9 + 0048271 commit b9bdee9
Show file tree
Hide file tree
Showing 7 changed files with 1,471 additions and 11 deletions.
135 changes: 135 additions & 0 deletions apitools/base/py/compression.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
#!/usr/bin/env python
#
# Copyright 2017 Google Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Compression support for apitools."""

from collections import deque

from apitools.base.py import gzip

__all__ = [
'CompressStream',
]


# pylint: disable=invalid-name
def CompressStream(in_stream, length, compresslevel=2, chunksize=16777216):

"""Compresses an input stream into a file-like buffer.
This reads from the input stream until either we've stored at least length
compressed bytes, or the input stream has been exhausted.
This supports streams of unknown size.
Args:
in_stream: The input stream to read from.
length: The target number of compressed bytes to buffer in the output
stream. The actual length of the output buffer can vary from this
length. If the input stream is exhaused, the output buffer may be
smaller than expected. Because data is written to the output
buffer in increments of the chunksize, the output buffer may be
larger than length by chunksize. Very uncompressible data can
exceed this further if gzip inflates the underlying data.
compresslevel: Optional, defaults to 2. The desired compression level.
chunksize: Optional, defaults to 16MiB. The chunk size used when
reading data from the input stream to write into the output
buffer.
Returns:
A file-like output buffer of compressed bytes, the number of bytes read
from the input stream, and a flag denoting if the input stream was
exhausted.
"""
in_read = 0
in_exhausted = False
out_stream = StreamingBuffer()
with gzip.GzipFile(mode='wb',
fileobj=out_stream,
compresslevel=compresslevel) as compress_stream:
# Read until we've written at least length bytes to the output stream.
while out_stream.length < length:
data = in_stream.read(chunksize)
data_length = len(data)
compress_stream.write(data)
in_read += data_length
# If we read less than requested, the stream is exhausted.
if data_length < chunksize:
in_exhausted = True
break
return out_stream, in_read, in_exhausted


class StreamingBuffer(object):

"""Provides a file-like object that writes to a temporary buffer.
When data is read from the buffer, it is permanently removed. This is
useful when there are memory constraints preventing the entire buffer from
being stored in memory.
"""

def __init__(self):
# The buffer of byte arrays.
self.__buf = deque()
# The number of bytes in __buf.
self.__size = 0

def __len__(self):
return self.__size

def __nonzero__(self):
# For 32-bit python2.x, len() cannot exceed a 32-bit number; avoid
# accidental len() calls from httplib in the form of "if this_object:".
return bool(self.__size)

@property
def length(self):
# For 32-bit python2.x, len() cannot exceed a 32-bit number.
return self.__size

def write(self, data):
# Gzip can write many 0 byte chunks for highly compressible data.
# Prevent them from being added internally.
if data is not None and data:
self.__buf.append(data)
self.__size += len(data)

def read(self, size=None):
"""Read at most size bytes from this buffer.
Bytes read from this buffer are consumed and are permanently removed.
Args:
size: If provided, read no more than size bytes from the buffer.
Otherwise, this reads the entire buffer.
Returns:
The bytes read from this buffer.
"""
if size is None:
size = self.__size
ret_list = []
while size > 0 and self.__buf:
data = self.__buf.popleft()
size -= len(data)
ret_list.append(data)
if size < 0:
ret_list[-1], remainder = ret_list[-1][:size], ret_list[-1][size:]
self.__buf.appendleft(remainder)
ret = b''.join(ret_list)
self.__size -= len(ret)
return ret
133 changes: 133 additions & 0 deletions apitools/base/py/compression_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
#!/usr/bin/env python
#
# Copyright 2017 Google Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Tests for compression."""

from apitools.base.py import compression
from apitools.base.py import gzip

import six
import unittest2


class CompressionTest(unittest2.TestCase):

def setUp(self):
# Sample highly compressible data (~50MB).
self.sample_data = b'abc' * 16777216
# Stream of the sample data.
self.stream = six.BytesIO()
self.stream.write(self.sample_data)
self.length = self.stream.tell()
self.stream.seek(0)

def testCompressionExhausted(self):
"""Test full compression.
Test that highly compressible data is actually compressed in entirety.
"""
output, read, exhausted = compression.CompressStream(
self.stream,
self.length,
9)
# Ensure the compressed buffer is smaller than the input buffer.
self.assertLess(output.length, self.length)
# Ensure we read the entire input stream.
self.assertEqual(read, self.length)
# Ensure the input stream was exhausted.
self.assertTrue(exhausted)

def testCompressionPartial(self):
"""Test partial compression.
Test that the length parameter works correctly. The amount of data
that's compressed can be greater than or equal to the requested length.
"""
output_length = 40
output, _, exhausted = compression.CompressStream(
self.stream,
output_length,
9)
# Ensure the requested read size is <= the compressed buffer size.
self.assertLessEqual(output_length, output.length)
# Ensure the input stream was not exhausted.
self.assertFalse(exhausted)

def testCompressionIntegrity(self):
"""Test that compressed data can be decompressed."""
output, read, exhausted = compression.CompressStream(
self.stream,
self.length,
9)
# Ensure uncompressed data matches the sample data.
with gzip.GzipFile(fileobj=output) as f:
original = f.read()
self.assertEqual(original, self.sample_data)
# Ensure we read the entire input stream.
self.assertEqual(read, self.length)
# Ensure the input stream was exhausted.
self.assertTrue(exhausted)


class StreamingBufferTest(unittest2.TestCase):

def setUp(self):
self.stream = compression.StreamingBuffer()

def testSimpleStream(self):
"""Test simple stream operations.
Test that the stream can be written to and read from. Also test that
reading from the stream consumes the bytes.
"""
# Ensure the stream is empty.
self.assertEqual(self.stream.length, 0)
# Ensure data is correctly written.
self.stream.write(b'Sample data')
self.assertEqual(self.stream.length, 11)
# Ensure data can be read and the read data is purged from the stream.
data = self.stream.read(11)
self.assertEqual(data, b'Sample data')
self.assertEqual(self.stream.length, 0)

def testPartialReads(self):
"""Test partial stream reads.
Test that the stream can be read in chunks while perserving the
consumption mechanics.
"""
self.stream.write(b'Sample data')
# Ensure data can be read and the read data is purged from the stream.
data = self.stream.read(6)
self.assertEqual(data, b'Sample')
self.assertEqual(self.stream.length, 5)
# Ensure the remaining data can be read.
data = self.stream.read(5)
self.assertEqual(data, b' data')
self.assertEqual(self.stream.length, 0)

def testTooShort(self):
"""Test excessive stream reads.
Test that more data can be requested from the stream than available
without raising an exception.
"""
self.stream.write(b'Sample')
# Ensure requesting more data than available does not raise an
# exception.
data = self.stream.read(100)
self.assertEqual(data, b'Sample')
self.assertEqual(self.stream.length, 0)
Loading

0 comments on commit b9bdee9

Please sign in to comment.