From 7ccade5be76c6715c0b8d2109ef6b484edadfe97 Mon Sep 17 00:00:00 2001 From: kevinli7 Date: Wed, 6 Dec 2017 15:25:30 -0500 Subject: [PATCH] Fix issue where uploading files would hang if a request failed while using stream_slice.StreamSlice. --- apitools/base/py/transfer.py | 13 ++++- apitools/base/py/transfer_test.py | 88 ++++++++++++++++++++++++++++++- 2 files changed, 99 insertions(+), 2 deletions(-) diff --git a/apitools/base/py/transfer.py b/apitools/base/py/transfer.py index e795a1a2..f4cd8bb3 100644 --- a/apitools/base/py/transfer.py +++ b/apitools/base/py/transfer.py @@ -982,8 +982,13 @@ def __SendMediaBody(self, start, additional_headers=None): if self.total_size is None: raise exceptions.TransferInvalidError( 'Total size must be known for SendMediaBody') + # Change body_stream from a stream to a string object. This is + # because httpwrapper.MakeRequest doesn't handle the case where + # request.body is a stream. In the case that the body is a stream, + # if a request has to be retried, then the stream will be exhausted + # and the request will hang. body_stream = stream_slice.StreamSlice( - self.stream, self.total_size - start) + self.stream, self.total_size - start).read() request = http_wrapper.Request(url=self.url, http_method='PUT', body=body_stream) @@ -1032,6 +1037,12 @@ def __SendChunk(self, start, additional_headers=None): else: end = min(start + self.chunksize, self.total_size) body_stream = stream_slice.StreamSlice(self.stream, end - start) + # Change body_stream from a stream to a string object. This is + # because httpwrapper.MakeRequest doesn't handle the case where + # request.body is a stream. In the case that the body is a stream, + # if a request has to be retried, then the stream will be exhausted + # and the request will hang. + body_stream = body_stream.read() # TODO(craigcitro): Think about clearer errors on "no data in # stream". request.body = body_stream diff --git a/apitools/base/py/transfer_test.py b/apitools/base/py/transfer_test.py index 1febbd49..dd5b9eba 100644 --- a/apitools/base/py/transfer_test.py +++ b/apitools/base/py/transfer_test.py @@ -17,6 +17,7 @@ """Tests for transfer.py.""" import string +import httplib2 import mock import six from six.moves import http_client @@ -310,7 +311,7 @@ def testMultipartEncoding(self): self.assertTrue(rewritten_upload_contents.endswith(upload_bytes)) -class CompressedUploadTest(unittest2.TestCase): +class UploadTest(unittest2.TestCase): def setUp(self): # Sample highly compressible data. @@ -329,6 +330,12 @@ def setUp(self): 'location': 'http://www.uploads.com'}, content='', request_url='http://www.uploads.com',) + # Sample failure response. + self.fail_response = http_wrapper.Response( + info={'status': http_client.SERVICE_UNAVAILABLE, + 'location': 'http://www.uploads.com'}, + content='', + request_url='http://www.uploads.com',) def testStreamInChunksCompressed(self): """Test that StreamInChunks will handle compression correctly.""" @@ -477,3 +484,82 @@ def testMediaCompressed(self): with gzip.GzipFile(fileobj=self.request.body) as f: original = f.read() self.assertTrue(self.sample_data in original) + + def HttpRequestSideEffect(self, responses=None): + responses = [(response.info, response.content) + for response in responses] + + def _side_effect(uri, **kwargs): # pylint: disable=unused-argument + body = kwargs['body'] + read_func = getattr(body, 'read', None) + if read_func: + # If the body is a stream, consume the stream. + body = read_func() + self.assertEqual(int(kwargs['headers']['content-length']), + len(body)) + return responses.pop(0) + return _side_effect + + def testRetryRequestChunks(self): + """Test that StreamInChunks will retry correctly.""" + # Create and configure the upload object. + bytes_http = httplib2.Http() + upload = transfer.Upload( + stream=self.sample_stream, + mime_type='text/plain', + total_size=len(self.sample_data), + close_stream=False, + http=bytes_http) + + upload.strategy = transfer.RESUMABLE_UPLOAD + # Set the chunk size so the entire stream is uploaded. + upload.chunksize = len(self.sample_data) + # Mock the upload to return the sample response. + with mock.patch.object(bytes_http, + 'request') as make_request: + # This side effect also checks the request body. + responses = [ + self.response, # Initial request in InitializeUpload(). + self.fail_response, # 503 status code from server. + self.response # Successful request. + ] + make_request.side_effect = self.HttpRequestSideEffect(responses) + + # Initialization. + upload.InitializeUpload(self.request, bytes_http) + upload.StreamInChunks() + + # Ensure the mock was called the correct number of times. + self.assertEquals(make_request.call_count, len(responses)) + + def testRetryRequestMedia(self): + """Test that StreamMedia will retry correctly.""" + # Create and configure the upload object. + bytes_http = httplib2.Http() + upload = transfer.Upload( + stream=self.sample_stream, + mime_type='text/plain', + total_size=len(self.sample_data), + close_stream=False, + http=bytes_http) + + upload.strategy = transfer.RESUMABLE_UPLOAD + # Set the chunk size so the entire stream is uploaded. + upload.chunksize = len(self.sample_data) + # Mock the upload to return the sample response. + with mock.patch.object(bytes_http, + 'request') as make_request: + # This side effect also checks the request body. + responses = [ + self.response, # Initial request in InitializeUpload(). + self.fail_response, # 503 status code from server. + self.response # Successful request. + ] + make_request.side_effect = self.HttpRequestSideEffect(responses) + + # Initialization. + upload.InitializeUpload(self.request, bytes_http) + upload.StreamMedia() + + # Ensure the mock was called the correct number of times.. + self.assertEquals(make_request.call_count, len(responses))