diff --git a/apitools/base/py/compression.py b/apitools/base/py/compression.py new file mode 100644 index 00000000..664f4386 --- /dev/null +++ b/apitools/base/py/compression.py @@ -0,0 +1,135 @@ +#!/usr/bin/env python +# +# Copyright 2017 Google Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Compression support for apitools.""" + +from collections import deque + +from apitools.base.py import gzip + +__all__ = [ + 'CompressStream', +] + + +# pylint: disable=invalid-name +def CompressStream(in_stream, length, compresslevel=2, chunksize=16777216): + + """Compresses an input stream into a file-like buffer. + + This reads from the input stream until either we've stored at least length + compressed bytes, or the input stream has been exhausted. + + This supports streams of unknown size. + + Args: + in_stream: The input stream to read from. + length: The target number of compressed bytes to buffer in the output + stream. The actual length of the output buffer can vary from this + length. If the input stream is exhaused, the output buffer may be + smaller than expected. Because data is written to the output + buffer in increments of the chunksize, the output buffer may be + larger than length by chunksize. Very uncompressible data can + exceed this further if gzip inflates the underlying data. + compresslevel: Optional, defaults to 2. The desired compression level. + chunksize: Optional, defaults to 16MiB. The chunk size used when + reading data from the input stream to write into the output + buffer. + + Returns: + A file-like output buffer of compressed bytes, the number of bytes read + from the input stream, and a flag denoting if the input stream was + exhausted. + """ + in_read = 0 + in_exhausted = False + out_stream = StreamingBuffer() + with gzip.GzipFile(mode='wb', + fileobj=out_stream, + compresslevel=compresslevel) as compress_stream: + # Read until we've written at least length bytes to the output stream. + while out_stream.length < length: + data = in_stream.read(chunksize) + data_length = len(data) + compress_stream.write(data) + in_read += data_length + # If we read less than requested, the stream is exhausted. + if data_length < chunksize: + in_exhausted = True + break + return out_stream, in_read, in_exhausted + + +class StreamingBuffer(object): + + """Provides a file-like object that writes to a temporary buffer. + + When data is read from the buffer, it is permanently removed. This is + useful when there are memory constraints preventing the entire buffer from + being stored in memory. + """ + + def __init__(self): + # The buffer of byte arrays. + self.__buf = deque() + # The number of bytes in __buf. + self.__size = 0 + + def __len__(self): + return self.__size + + def __nonzero__(self): + # For 32-bit python2.x, len() cannot exceed a 32-bit number; avoid + # accidental len() calls from httplib in the form of "if this_object:". + return bool(self.__size) + + @property + def length(self): + # For 32-bit python2.x, len() cannot exceed a 32-bit number. + return self.__size + + def write(self, data): + # Gzip can write many 0 byte chunks for highly compressible data. + # Prevent them from being added internally. + if data is not None and data: + self.__buf.append(data) + self.__size += len(data) + + def read(self, size=None): + """Read at most size bytes from this buffer. + + Bytes read from this buffer are consumed and are permanently removed. + + Args: + size: If provided, read no more than size bytes from the buffer. + Otherwise, this reads the entire buffer. + + Returns: + The bytes read from this buffer. + """ + if size is None: + size = self.__size + ret_list = [] + while size > 0 and self.__buf: + data = self.__buf.popleft() + size -= len(data) + ret_list.append(data) + if size < 0: + ret_list[-1], remainder = ret_list[-1][:size], ret_list[-1][size:] + self.__buf.appendleft(remainder) + ret = b''.join(ret_list) + self.__size -= len(ret) + return ret diff --git a/apitools/base/py/compression_test.py b/apitools/base/py/compression_test.py new file mode 100644 index 00000000..14c2a54e --- /dev/null +++ b/apitools/base/py/compression_test.py @@ -0,0 +1,133 @@ +#!/usr/bin/env python +# +# Copyright 2017 Google Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Tests for compression.""" + +from apitools.base.py import compression +from apitools.base.py import gzip + +import six +import unittest2 + + +class CompressionTest(unittest2.TestCase): + + def setUp(self): + # Sample highly compressible data (~50MB). + self.sample_data = b'abc' * 16777216 + # Stream of the sample data. + self.stream = six.BytesIO() + self.stream.write(self.sample_data) + self.length = self.stream.tell() + self.stream.seek(0) + + def testCompressionExhausted(self): + """Test full compression. + + Test that highly compressible data is actually compressed in entirety. + """ + output, read, exhausted = compression.CompressStream( + self.stream, + self.length, + 9) + # Ensure the compressed buffer is smaller than the input buffer. + self.assertLess(output.length, self.length) + # Ensure we read the entire input stream. + self.assertEqual(read, self.length) + # Ensure the input stream was exhausted. + self.assertTrue(exhausted) + + def testCompressionPartial(self): + """Test partial compression. + + Test that the length parameter works correctly. The amount of data + that's compressed can be greater than or equal to the requested length. + """ + output_length = 40 + output, _, exhausted = compression.CompressStream( + self.stream, + output_length, + 9) + # Ensure the requested read size is <= the compressed buffer size. + self.assertLessEqual(output_length, output.length) + # Ensure the input stream was not exhausted. + self.assertFalse(exhausted) + + def testCompressionIntegrity(self): + """Test that compressed data can be decompressed.""" + output, read, exhausted = compression.CompressStream( + self.stream, + self.length, + 9) + # Ensure uncompressed data matches the sample data. + with gzip.GzipFile(fileobj=output) as f: + original = f.read() + self.assertEqual(original, self.sample_data) + # Ensure we read the entire input stream. + self.assertEqual(read, self.length) + # Ensure the input stream was exhausted. + self.assertTrue(exhausted) + + +class StreamingBufferTest(unittest2.TestCase): + + def setUp(self): + self.stream = compression.StreamingBuffer() + + def testSimpleStream(self): + """Test simple stream operations. + + Test that the stream can be written to and read from. Also test that + reading from the stream consumes the bytes. + """ + # Ensure the stream is empty. + self.assertEqual(self.stream.length, 0) + # Ensure data is correctly written. + self.stream.write(b'Sample data') + self.assertEqual(self.stream.length, 11) + # Ensure data can be read and the read data is purged from the stream. + data = self.stream.read(11) + self.assertEqual(data, b'Sample data') + self.assertEqual(self.stream.length, 0) + + def testPartialReads(self): + """Test partial stream reads. + + Test that the stream can be read in chunks while perserving the + consumption mechanics. + """ + self.stream.write(b'Sample data') + # Ensure data can be read and the read data is purged from the stream. + data = self.stream.read(6) + self.assertEqual(data, b'Sample') + self.assertEqual(self.stream.length, 5) + # Ensure the remaining data can be read. + data = self.stream.read(5) + self.assertEqual(data, b' data') + self.assertEqual(self.stream.length, 0) + + def testTooShort(self): + """Test excessive stream reads. + + Test that more data can be requested from the stream than available + without raising an exception. + """ + self.stream.write(b'Sample') + # Ensure requesting more data than available does not raise an + # exception. + data = self.stream.read(100) + self.assertEqual(data, b'Sample') + self.assertEqual(self.stream.length, 0) diff --git a/apitools/base/py/gzip.py b/apitools/base/py/gzip.py new file mode 100644 index 00000000..1fddb8bf --- /dev/null +++ b/apitools/base/py/gzip.py @@ -0,0 +1,617 @@ +# Copyright (c) 2001, 2002, 2003, 2004, 2005, 2006, 2007, 2008, 2009, 2010, +# 2011, 2012, 2013, 2014, 2015, 2016, 2017 Python Software Foundation; All +# Rights Reserved +# +# This is a backport from python 3.4 into python 2.7. Text and exclusive mode +# support are removed as they're unsupported in 2.7. This backport patches a +# streaming bug that exists in python 2.7. + +"""Functions that read and write gzipped files. + +The user of the file doesn't have to worry about the compression, +but random access is not allowed.""" + +# based on Andrew Kuchling's minigzip.py distributed with the zlib module + +import six +from six.moves import builtins +from six.moves import range + +import struct +import sys +import time +import os +import zlib +import io + +__all__ = ["GzipFile", "open", "compress", "decompress"] + +FTEXT, FHCRC, FEXTRA, FNAME, FCOMMENT = 1, 2, 4, 8, 16 + +READ, WRITE = 1, 2 + + +def open(filename, mode="rb", compresslevel=9): + """Shorthand for GzipFile(filename, mode, compresslevel). + + The filename argument is required; mode defaults to 'rb' + and compresslevel defaults to 9. + + """ + return GzipFile(filename, mode, compresslevel) + + +def write32u(output, value): + # The L format writes the bit pattern correctly whether signed + # or unsigned. + output.write(struct.pack("' + + def _check_closed(self): + """Raises a ValueError if the underlying file object has been closed. + + """ + if self.closed: + raise ValueError('I/O operation on closed file.') + + def _init_write(self, filename): + self.name = filename + self.crc = zlib.crc32(b"") & 0xffffffff + self.size = 0 + self.writebuf = [] + self.bufsize = 0 + + def _write_gzip_header(self): + self.fileobj.write(b'\037\213') # magic header + self.fileobj.write(b'\010') # compression method + try: + # RFC 1952 requires the FNAME field to be Latin-1. Do not + # include filenames that cannot be represented that way. + fname = os.path.basename(self.name) + if not isinstance(fname, six.binary_type): + fname = fname.encode('latin-1') + if fname.endswith(b'.gz'): + fname = fname[:-3] + except UnicodeEncodeError: + fname = b'' + flags = 0 + if fname: + flags = FNAME + self.fileobj.write(six.unichr(flags).encode('latin-1')) + mtime = self.mtime + if mtime is None: + mtime = time.time() + write32u(self.fileobj, int(mtime)) + self.fileobj.write(b'\002') + self.fileobj.write(b'\377') + if fname: + self.fileobj.write(fname + b'\000') + + def _init_read(self): + self.crc = zlib.crc32(b"") & 0xffffffff + self.size = 0 + + def _read_exact(self, n): + data = self.fileobj.read(n) + while len(data) < n: + b = self.fileobj.read(n - len(data)) + if not b: + raise EOFError("Compressed file ended before the " + "end-of-stream marker was reached") + data += b + return data + + def _read_gzip_header(self): + magic = self.fileobj.read(2) + if magic == b'': + return False + + if magic != b'\037\213': + raise OSError('Not a gzipped file') + + method, flag, self.mtime = struct.unpack(" 0: + self.fileobj.write(self.compress.compress(data)) + self.size += len(data) + self.crc = zlib.crc32(data, self.crc) & 0xffffffff + self.offset += len(data) + + return len(data) + + def read(self, size=-1): + self._check_closed() + if self.mode != READ: + import errno + raise OSError(errno.EBADF, "read() on write-only GzipFile object") + + if self.extrasize <= 0 and self.fileobj is None: + return b'' + + readsize = 1024 + if size < 0: # get the whole thing + while self._read(readsize): + readsize = min(self.max_read_chunk, readsize * 2) + size = self.extrasize + else: # just get some more of it + while size > self.extrasize: + if not self._read(readsize): + if size > self.extrasize: + size = self.extrasize + break + readsize = min(self.max_read_chunk, readsize * 2) + + offset = self.offset - self.extrastart + chunk = self.extrabuf[offset: offset + size] + self.extrasize = self.extrasize - size + + self.offset += size + return chunk + + def read1(self, size=-1): + self._check_closed() + if self.mode != READ: + import errno + raise OSError(errno.EBADF, "read1() on write-only GzipFile object") + + if self.extrasize <= 0 and self.fileobj is None: + return b'' + + # For certain input data, a single call to _read() may not return + # any data. In this case, retry until we get some data or reach EOF. + while self.extrasize <= 0 and self._read(): + pass + if size < 0 or size > self.extrasize: + size = self.extrasize + + offset = self.offset - self.extrastart + chunk = self.extrabuf[offset: offset + size] + self.extrasize -= size + self.offset += size + return chunk + + def peek(self, n): + if self.mode != READ: + import errno + raise OSError(errno.EBADF, "peek() on write-only GzipFile object") + + # Do not return ridiculously small buffers, for one common idiom + # is to call peek(1) and expect more bytes in return. + if n < 100: + n = 100 + if self.extrasize == 0: + if self.fileobj is None: + return b'' + # Ensure that we don't return b"" if we haven't reached EOF. + # 1024 is the same buffering heuristic used in read() + while self.extrasize == 0 and self._read(max(n, 1024)): + pass + offset = self.offset - self.extrastart + remaining = self.extrasize + assert remaining == len(self.extrabuf) - offset + return self.extrabuf[offset:offset + n] + + def _unread(self, buf): + self.extrasize = len(buf) + self.extrasize + self.offset -= len(buf) + + def _read(self, size=1024): + if self.fileobj is None: + return False + + if self._new_member: + # If the _new_member flag is set, we have to + # jump to the next member, if there is one. + self._init_read() + if not self._read_gzip_header(): + return False + self.decompress = zlib.decompressobj(-zlib.MAX_WBITS) + self._new_member = False + + # Read a chunk of data from the file + buf = self.fileobj.read(size) + + # If the EOF has been reached, flush the decompression object + # and mark this object as finished. + + if buf == b"": + uncompress = self.decompress.flush() + # Prepend the already read bytes to the fileobj to they can be + # seen by _read_eof() + self.fileobj.prepend(self.decompress.unused_data, True) + self._read_eof() + self._add_read_data(uncompress) + return False + + uncompress = self.decompress.decompress(buf) + self._add_read_data(uncompress) + + if self.decompress.unused_data != b"": + # Ending case: we've come to the end of a member in the file, + # so seek back to the start of the unused data, finish up + # this member, and read a new gzip header. + # Prepend the already read bytes to the fileobj to they can be + # seen by _read_eof() and _read_gzip_header() + self.fileobj.prepend(self.decompress.unused_data, True) + # Check the CRC and file size, and set the flag so we read + # a new member on the next call + self._read_eof() + self._new_member = True + return True + + def _add_read_data(self, data): + self.crc = zlib.crc32(data, self.crc) & 0xffffffff + offset = self.offset - self.extrastart + self.extrabuf = self.extrabuf[offset:] + data + self.extrasize = self.extrasize + len(data) + self.extrastart = self.offset + self.size = self.size + len(data) + + def _read_eof(self): + # We've read to the end of the file + # We check the that the computed CRC and size of the + # uncompressed data matches the stored values. Note that the size + # stored is the true file size mod 2**32. + crc32, isize = struct.unpack(" 0: + self.extrasize -= i - offset + self.offset += i - offset + return self.extrabuf[offset: i] + + size = sys.maxsize + readsize = self.min_readsize + else: + readsize = size + bufs = [] + while size != 0: + c = self.read(readsize) + i = c.find(b'\n') + + # We set i=size to break out of the loop under two + # conditions: 1) there's no newline, and the chunk is + # larger than size, or 2) there is a newline, but the + # resulting line would be longer than 'size'. + if (size <= i) or (i == -1 and len(c) > size): + i = size - 1 + + if i >= 0 or c == b'': + bufs.append(c[:i + 1]) # Add portion of last chunk + self._unread(c[i + 1:]) # Push back rest of chunk + break + + # Append chunk to list, decrease 'size', + bufs.append(c) + size = size - len(c) + readsize = min(size, readsize * 2) + if readsize > self.min_readsize: + self.min_readsize = min(readsize, self.min_readsize * 2, 512) + return b''.join(bufs) # Return resulting line + + +def compress(data, compresslevel=9): + """Compress data in one shot and return the compressed string. + Optional argument is the compression level, in range of 0-9. + """ + buf = io.BytesIO() + with GzipFile(fileobj=buf, mode='wb', compresslevel=compresslevel) as f: + f.write(data) + return buf.getvalue() + + +def decompress(data): + """Decompress a gzip compressed string in one shot. + Return the decompressed string. + """ + with GzipFile(fileobj=io.BytesIO(data)) as f: + return f.read() diff --git a/apitools/base/py/gzip_test.py b/apitools/base/py/gzip_test.py new file mode 100644 index 00000000..2d7d4580 --- /dev/null +++ b/apitools/base/py/gzip_test.py @@ -0,0 +1,514 @@ +# Copyright (c) 2001, 2002, 2003, 2004, 2005, 2006, 2007, 2008, 2009, 2010, +# 2011, 2012, 2013, 2014, 2015, 2016, 2017 Python Software Foundation; All +# Rights Reserved +# +# This is a backport from python 3.4 into python 2.7. Text and exclusive mode +# support are removed as they're unsupported in 2.7. This backport patches a +# streaming bug that exists in python 2.7. + +"""Test script for the gzip module. +""" + +import six +from six.moves import range + +import unittest +import os +import io +import struct +from apitools.base.py import gzip +from io import open + +data1 = b""" int length=DEFAULTALLOC, err = Z_OK; + PyObject *RetVal; + int flushmode = Z_FINISH; + unsigned long start_total_out; + +""" + +data2 = b"""/* zlibmodule.c -- gzip-compatible data compression */ +/* See http://www.gzip.org/zlib/ +/* See http://www.winimage.com/zLibDll for Windows */ +""" + + +def unlink(filename): + try: + os.unlink(filename) + except: + pass + + +class UnseekableIO(io.BytesIO): + def seekable(self): + return False + + def tell(self): + raise io.UnsupportedOperation + + def seek(self, *args): + raise io.UnsupportedOperation + + +class BaseTest(unittest.TestCase): + filename = "@test" + + def setUp(self): + unlink(self.filename) + + def tearDown(self): + unlink(self.filename) + + +class TestGzip(BaseTest): + def write_and_read_back(self, data, mode='b'): + b_data = bytes(data) + with gzip.GzipFile(self.filename, 'w' + mode) as f: + l = f.write(data) + self.assertEqual(l, len(b_data)) + with gzip.GzipFile(self.filename, 'r' + mode) as f: + self.assertEqual(f.read(), b_data) + + def test_write(self): + with gzip.GzipFile(self.filename, 'wb') as f: + f.write(data1 * 50) + + # Try flush and fileno. + f.flush() + f.fileno() + if hasattr(os, 'fsync'): + os.fsync(f.fileno()) + f.close() + + # Test multiple close() calls. + f.close() + + # The following test_write_xy methods test that write accepts + # the corresponding bytes-like object type as input + # and that the data written equals bytes(xy) in all cases. + def test_write_memoryview(self): + data = memoryview(data1 * 50) + self.write_and_read_back(data.tobytes()) + data = memoryview(bytes(range(256))) + self.write_and_read_back(data.tobytes()) + + def test_write_incompatible_type(self): + # Test that non-bytes-like types raise TypeError. + # Issue #21560: attempts to write incompatible types + # should not affect the state of the fileobject + with gzip.GzipFile(self.filename, 'wb') as f: + if six.PY2: + with self.assertRaises(UnicodeEncodeError): + f.write(u'\xff') + elif six.PY3: + with self.assertRaises(TypeError): + f.write(u'\xff') + with self.assertRaises(TypeError): + f.write([1]) + f.write(data1) + with gzip.GzipFile(self.filename, 'rb') as f: + self.assertEqual(f.read(), data1) + + def test_read(self): + self.test_write() + # Try reading. + with gzip.GzipFile(self.filename, 'r') as f: + d = f.read() + self.assertEqual(d, data1 * 50) + + def test_read1(self): + self.test_write() + blocks = [] + nread = 0 + with gzip.GzipFile(self.filename, 'r') as f: + while True: + d = f.read1() + if not d: + break + blocks.append(d) + nread += len(d) + # Check that position was updated correctly (see issue10791). + self.assertEqual(f.tell(), nread) + self.assertEqual(b''.join(blocks), data1 * 50) + + def test_io_on_closed_object(self): + # Test that I/O operations on closed GzipFile objects raise a + # ValueError, just like the corresponding functions on file objects. + + # Write to a file, open it for reading, then close it. + self.test_write() + f = gzip.GzipFile(self.filename, 'r') + f.close() + with self.assertRaises(ValueError): + f.read(1) + with self.assertRaises(ValueError): + f.seek(0) + with self.assertRaises(ValueError): + f.tell() + # Open the file for writing, then close it. + f = gzip.GzipFile(self.filename, 'w') + f.close() + with self.assertRaises(ValueError): + f.write(b'') + with self.assertRaises(ValueError): + f.flush() + + def test_append(self): + self.test_write() + # Append to the previous file + with gzip.GzipFile(self.filename, 'ab') as f: + f.write(data2 * 15) + + with gzip.GzipFile(self.filename, 'rb') as f: + d = f.read() + self.assertEqual(d, (data1 * 50) + (data2 * 15)) + + def test_many_append(self): + # Bug #1074261 was triggered when reading a file that contained + # many, many members. Create such a file and verify that reading it + # works. + with gzip.GzipFile(self.filename, 'wb', 9) as f: + f.write(b'a') + for i in range(0, 200): + with gzip.GzipFile(self.filename, "ab", 9) as f: # append + f.write(b'a') + + # Try reading the file + with gzip.GzipFile(self.filename, "rb") as zgfile: + contents = b"" + while 1: + ztxt = zgfile.read(8192) + contents += ztxt + if not ztxt: + break + self.assertEqual(contents, b'a' * 201) + + def test_buffered_reader(self): + # Issue #7471: a GzipFile can be wrapped in a BufferedReader for + # performance. + self.test_write() + + with gzip.GzipFile(self.filename, 'rb') as f: + with io.BufferedReader(f) as r: + lines = [line for line in r] + + self.assertEqual(lines, 50 * data1.splitlines(True)) + + def test_readline(self): + self.test_write() + # Try .readline() with varying line lengths + + with gzip.GzipFile(self.filename, 'rb') as f: + line_length = 0 + while 1: + L = f.readline(line_length) + if not L and line_length != 0: + break + self.assertTrue(len(L) <= line_length) + line_length = (line_length + 1) % 50 + + def test_readlines(self): + self.test_write() + # Try .readlines() + + with gzip.GzipFile(self.filename, 'rb') as f: + L = f.readlines() + + with gzip.GzipFile(self.filename, 'rb') as f: + while 1: + L = f.readlines(150) + if L == []: + break + + def test_seek_read(self): + self.test_write() + # Try seek, read test + + with gzip.GzipFile(self.filename) as f: + while 1: + oldpos = f.tell() + line1 = f.readline() + if not line1: + break + newpos = f.tell() + f.seek(oldpos) # negative seek + if len(line1) > 10: + amount = 10 + else: + amount = len(line1) + line2 = f.read(amount) + self.assertEqual(line1[:amount], line2) + f.seek(newpos) # positive seek + + def test_seek_whence(self): + self.test_write() + # Try seek(whence=1), read test + + with gzip.GzipFile(self.filename) as f: + f.read(10) + f.seek(10, whence=1) + y = f.read(10) + self.assertEqual(y, data1[20:30]) + + def test_seek_write(self): + # Try seek, write test + with gzip.GzipFile(self.filename, 'w') as f: + for pos in range(0, 256, 16): + f.seek(pos) + f.write(b'GZ\n') + + def test_mode(self): + self.test_write() + with gzip.GzipFile(self.filename, 'r') as f: + self.assertEqual(f.myfileobj.mode, 'rb') + + def test_1647484(self): + for mode in ('wb', 'rb'): + with gzip.GzipFile(self.filename, mode) as f: + self.assertTrue(hasattr(f, "name")) + self.assertEqual(f.name, self.filename) + + def test_paddedfile_getattr(self): + self.test_write() + with gzip.GzipFile(self.filename, 'rb') as f: + self.assertTrue(hasattr(f.fileobj, "name")) + self.assertEqual(f.fileobj.name, self.filename) + + def test_mtime(self): + mtime = 123456789 + with gzip.GzipFile(self.filename, 'w', mtime=mtime) as fWrite: + fWrite.write(data1) + with gzip.GzipFile(self.filename) as fRead: + dataRead = fRead.read() + self.assertEqual(dataRead, data1) + self.assertTrue(hasattr(fRead, 'mtime')) + self.assertEqual(fRead.mtime, mtime) + + def test_metadata(self): + mtime = 123456789 + + with gzip.GzipFile(self.filename, 'w', mtime=mtime) as fWrite: + fWrite.write(data1) + + with open(self.filename, 'rb') as fRead: + # see RFC 1952: http://www.faqs.org/rfcs/rfc1952.html + + idBytes = fRead.read(2) + self.assertEqual(idBytes, b'\x1f\x8b') # gzip ID + + cmByte = fRead.read(1) + self.assertEqual(cmByte, b'\x08') # deflate + + flagsByte = fRead.read(1) + self.assertEqual(flagsByte, b'\x08') # only the FNAME flag is set + + mtimeBytes = fRead.read(4) + self.assertEqual(mtimeBytes, struct.pack( + '