diff --git a/apitools/base/py/transfer.py b/apitools/base/py/transfer.py index e795a1a2..fa6de7e2 100644 --- a/apitools/base/py/transfer.py +++ b/apitools/base/py/transfer.py @@ -165,12 +165,12 @@ def _type_name(self): def EnsureInitialized(self): if not self.initialized: raise exceptions.TransferInvalidError( - 'Cannot use uninitialized %s', self._type_name) + 'Cannot use uninitialized %s' % self._type_name) def EnsureUninitialized(self): if self.initialized: raise exceptions.TransferInvalidError( - 'Cannot re-initialize %s', self._type_name) + 'Cannot re-initialize %s' % self._type_name) def __del__(self): if self.__close_stream: @@ -284,6 +284,7 @@ def ConfigureRequest(self, http_request, url_builder): http_request.headers['Range'] = 'bytes=0-%d' % (self.chunksize - 1,) def __SetTotal(self, info): + """Sets the total size based off info if possible otherwise 0.""" if 'content-range' in info: _, _, total = info['content-range'].rpartition('/') if total != '*': @@ -331,6 +332,7 @@ def InitializeDownload(self, http_request, http=None, client=None): self.StreamInChunks() def __NormalizeStartEnd(self, start, end=None): + """Normalizes start and end values based on total size.""" if end is not None: if start < 0: raise exceptions.TransferInvalidError( @@ -879,9 +881,14 @@ def __ValidateChunksize(self, chunksize=None): chunksize = chunksize or self.chunksize if chunksize % self.__server_chunk_granularity: raise exceptions.ConfigurationValueError( - 'Server requires chunksize to be a multiple of %d', + 'Server requires chunksize to be a multiple of %d' % self.__server_chunk_granularity) + def __IsRetryable(self, response): + return (response.status_code >= 500 or + response.status_code == http_wrapper.TOO_MANY_REQUESTS or + response.retry_after) + def __StreamMedia(self, callback=None, finish_callback=None, additional_headers=None, use_chunks=True): """Helper function for StreamMedia / StreamInChunks.""" @@ -913,7 +920,23 @@ def CallSendMediaBody(start): if response.status_code in (http_client.OK, http_client.CREATED): self.__complete = True break - self.__progress = self.__GetLastByte(response.info['range']) + if response.status_code not in ( + http_client.OK, http_client.CREATED, + http_wrapper.RESUME_INCOMPLETE): + # Only raise an exception if the error is something we can't + # recover from. + if (self.strategy != RESUMABLE_UPLOAD or + not self.__IsRetryable(response)): + raise exceptions.HttpError.FromResponse(response) + # We want to reset our state to wherever the server left us + # before this failed request, and then raise. + self.RefreshResumableUploadState() + + self._ExecuteCallback(callback, response) + continue + + self.__progress = self.__GetLastByte( + self._GetRangeHeaderFromResponse(response)) if self.progress + 1 != self.stream.tell(): # TODO(craigcitro): Add a better way to recover here. raise exceptions.CommunicationError( @@ -960,20 +983,21 @@ def StreamInChunks(self, callback=None, finish_callback=None, def __SendMediaRequest(self, request, end): """Request helper function for SendMediaBody & SendChunk.""" + def CheckResponse(response): + if response is None: + # Caller shouldn't call us if the response is None, + # but handle anyway. + raise exceptions.RequestError( + 'Request to url %s did not return a response.' % + response.request_url) response = http_wrapper.MakeRequest( self.bytes_http, request, retry_func=self.retry_func, - retries=self.num_retries) - if response.status_code not in (http_client.OK, http_client.CREATED, - http_wrapper.RESUME_INCOMPLETE): - # We want to reset our state to wherever the server left us - # before this failed request, and then raise. - self.RefreshResumableUploadState() - raise exceptions.HttpError.FromResponse(response) + retries=self.num_retries, check_response_func=CheckResponse) if response.status_code == http_wrapper.RESUME_INCOMPLETE: last_byte = self.__GetLastByte( self._GetRangeHeaderFromResponse(response)) if last_byte + 1 != end: - self.stream.seek(last_byte) + self.stream.seek(last_byte + 1) return response def __SendMediaBody(self, start, additional_headers=None): diff --git a/apitools/base/py/transfer_test.py b/apitools/base/py/transfer_test.py index a944b057..63d5e351 100644 --- a/apitools/base/py/transfer_test.py +++ b/apitools/base/py/transfer_test.py @@ -17,6 +17,7 @@ """Tests for transfer.py.""" import string +import httplib2 import mock import six from six.moves import http_client @@ -360,11 +361,9 @@ def testStreamInChunksCompressed(self): upload.InitializeUpload(self.request, 'http') upload.StreamInChunks() # Get the uploaded request and end position of the stream. - (request, end), _ = mock_result.call_args_list[0] + (request, _), _ = mock_result.call_args_list[0] # Ensure the mock was called. self.assertTrue(mock_result.called) - # Ensure the stream was fully read. - self.assertEqual(len(self.sample_data), end) # Ensure the correct content encoding was set. self.assertEqual(request.headers['Content-Encoding'], 'gzip') # Ensure the stream was compresed. @@ -498,3 +497,80 @@ def _side_effect(uri, **kwargs): # pylint: disable=unused-argument len(body)) return responses.pop(0) return _side_effect + + def testRetryRequestChunks(self): + """Test that StreamInChunks will retry correctly.""" + refresh_response = http_wrapper.Response( + info={'status': http_wrapper.RESUME_INCOMPLETE, + 'location': 'http://www.uploads.com'}, + content='', + request_url='http://www.uploads.com',) + + # Create and configure the upload object. + bytes_http = httplib2.Http() + upload = transfer.Upload( + stream=self.sample_stream, + mime_type='text/plain', + total_size=len(self.sample_data), + close_stream=False, + http=bytes_http) + + upload.strategy = transfer.RESUMABLE_UPLOAD + # Set the chunk size so the entire stream is uploaded. + upload.chunksize = len(self.sample_data) + # Mock the upload to return the sample response. + with mock.patch.object(bytes_http, + 'request') as make_request: + # This side effect also checks the request body. + responses = [ + self.response, # Initial request in InitializeUpload(). + self.fail_response, # 503 status code from server. + refresh_response, # Refresh upload progress. + self.response, # Successful request. + ] + make_request.side_effect = self.HttpRequestSideEffect(responses) + + # Initialization. + upload.InitializeUpload(self.request, bytes_http) + upload.StreamInChunks() + + # Ensure the mock was called the correct number of times. + self.assertEquals(make_request.call_count, len(responses)) + + def testStreamInChunks(self): + """Test StreamInChunks.""" + resume_incomplete_responses = [http_wrapper.Response( + info={'status': http_wrapper.RESUME_INCOMPLETE, + 'location': 'http://www.uploads.com', + 'range': '0-{}'.format(end)}, + content='', + request_url='http://www.uploads.com',) for end in [199, 399, 599]] + responses = [ + self.response # Initial request in InitializeUpload(). + ] + resume_incomplete_responses + [ + self.response, # Successful request. + ] + # Create and configure the upload object. + bytes_http = httplib2.Http() + upload = transfer.Upload( + stream=self.sample_stream, + mime_type='text/plain', + total_size=len(self.sample_data), + close_stream=False, + http=bytes_http) + + upload.strategy = transfer.RESUMABLE_UPLOAD + # Set the chunk size so the entire stream is uploaded. + upload.chunksize = 200 + # Mock the upload to return the sample response. + with mock.patch.object(bytes_http, + 'request') as make_request: + # This side effect also checks the request body. + make_request.side_effect = self.HttpRequestSideEffect(responses) + + # Initialization. + upload.InitializeUpload(self.request, bytes_http) + upload.StreamInChunks() + + # Ensure the mock was called the correct number of times. + self.assertEquals(make_request.call_count, len(responses)) diff --git a/default.pylintrc b/default.pylintrc index 7b9c3c45..2b06d98a 100644 --- a/default.pylintrc +++ b/default.pylintrc @@ -49,6 +49,7 @@ disable = cyclic-import, fixme, import-error, + inconsistent-return-statements, locally-disabled, locally-enabled, no-member,