Skip to content

Commit

Permalink
Merge pull request #198 from kevinli7/transfer-fix
Browse files Browse the repository at this point in the history
Fix issue where resumable uploads weren't resuming correctly on 500s.
  • Loading branch information
kevinli7 authored Jan 4, 2018
2 parents 4a3dfa2 + 3cca793 commit 8f99093
Show file tree
Hide file tree
Showing 3 changed files with 116 additions and 15 deletions.
48 changes: 36 additions & 12 deletions apitools/base/py/transfer.py
Original file line number Diff line number Diff line change
Expand Up @@ -165,12 +165,12 @@ def _type_name(self):
def EnsureInitialized(self):
if not self.initialized:
raise exceptions.TransferInvalidError(
'Cannot use uninitialized %s', self._type_name)
'Cannot use uninitialized %s' % self._type_name)

def EnsureUninitialized(self):
if self.initialized:
raise exceptions.TransferInvalidError(
'Cannot re-initialize %s', self._type_name)
'Cannot re-initialize %s' % self._type_name)

def __del__(self):
if self.__close_stream:
Expand Down Expand Up @@ -284,6 +284,7 @@ def ConfigureRequest(self, http_request, url_builder):
http_request.headers['Range'] = 'bytes=0-%d' % (self.chunksize - 1,)

def __SetTotal(self, info):
"""Sets the total size based off info if possible otherwise 0."""
if 'content-range' in info:
_, _, total = info['content-range'].rpartition('/')
if total != '*':
Expand Down Expand Up @@ -331,6 +332,7 @@ def InitializeDownload(self, http_request, http=None, client=None):
self.StreamInChunks()

def __NormalizeStartEnd(self, start, end=None):
"""Normalizes start and end values based on total size."""
if end is not None:
if start < 0:
raise exceptions.TransferInvalidError(
Expand Down Expand Up @@ -879,9 +881,14 @@ def __ValidateChunksize(self, chunksize=None):
chunksize = chunksize or self.chunksize
if chunksize % self.__server_chunk_granularity:
raise exceptions.ConfigurationValueError(
'Server requires chunksize to be a multiple of %d',
'Server requires chunksize to be a multiple of %d' %
self.__server_chunk_granularity)

def __IsRetryable(self, response):
return (response.status_code >= 500 or
response.status_code == http_wrapper.TOO_MANY_REQUESTS or
response.retry_after)

def __StreamMedia(self, callback=None, finish_callback=None,
additional_headers=None, use_chunks=True):
"""Helper function for StreamMedia / StreamInChunks."""
Expand Down Expand Up @@ -913,7 +920,23 @@ def CallSendMediaBody(start):
if response.status_code in (http_client.OK, http_client.CREATED):
self.__complete = True
break
self.__progress = self.__GetLastByte(response.info['range'])
if response.status_code not in (
http_client.OK, http_client.CREATED,
http_wrapper.RESUME_INCOMPLETE):
# Only raise an exception if the error is something we can't
# recover from.
if (self.strategy != RESUMABLE_UPLOAD or
not self.__IsRetryable(response)):
raise exceptions.HttpError.FromResponse(response)
# We want to reset our state to wherever the server left us
# before this failed request, and then raise.
self.RefreshResumableUploadState()

self._ExecuteCallback(callback, response)
continue

self.__progress = self.__GetLastByte(
self._GetRangeHeaderFromResponse(response))
if self.progress + 1 != self.stream.tell():
# TODO(craigcitro): Add a better way to recover here.
raise exceptions.CommunicationError(
Expand Down Expand Up @@ -960,20 +983,21 @@ def StreamInChunks(self, callback=None, finish_callback=None,

def __SendMediaRequest(self, request, end):
"""Request helper function for SendMediaBody & SendChunk."""
def CheckResponse(response):
if response is None:
# Caller shouldn't call us if the response is None,
# but handle anyway.
raise exceptions.RequestError(
'Request to url %s did not return a response.' %
response.request_url)
response = http_wrapper.MakeRequest(
self.bytes_http, request, retry_func=self.retry_func,
retries=self.num_retries)
if response.status_code not in (http_client.OK, http_client.CREATED,
http_wrapper.RESUME_INCOMPLETE):
# We want to reset our state to wherever the server left us
# before this failed request, and then raise.
self.RefreshResumableUploadState()
raise exceptions.HttpError.FromResponse(response)
retries=self.num_retries, check_response_func=CheckResponse)
if response.status_code == http_wrapper.RESUME_INCOMPLETE:
last_byte = self.__GetLastByte(
self._GetRangeHeaderFromResponse(response))
if last_byte + 1 != end:
self.stream.seek(last_byte)
self.stream.seek(last_byte + 1)
return response

def __SendMediaBody(self, start, additional_headers=None):
Expand Down
82 changes: 79 additions & 3 deletions apitools/base/py/transfer_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
"""Tests for transfer.py."""
import string

import httplib2
import mock
import six
from six.moves import http_client
Expand Down Expand Up @@ -360,11 +361,9 @@ def testStreamInChunksCompressed(self):
upload.InitializeUpload(self.request, 'http')
upload.StreamInChunks()
# Get the uploaded request and end position of the stream.
(request, end), _ = mock_result.call_args_list[0]
(request, _), _ = mock_result.call_args_list[0]
# Ensure the mock was called.
self.assertTrue(mock_result.called)
# Ensure the stream was fully read.
self.assertEqual(len(self.sample_data), end)
# Ensure the correct content encoding was set.
self.assertEqual(request.headers['Content-Encoding'], 'gzip')
# Ensure the stream was compresed.
Expand Down Expand Up @@ -498,3 +497,80 @@ def _side_effect(uri, **kwargs): # pylint: disable=unused-argument
len(body))
return responses.pop(0)
return _side_effect

def testRetryRequestChunks(self):
"""Test that StreamInChunks will retry correctly."""
refresh_response = http_wrapper.Response(
info={'status': http_wrapper.RESUME_INCOMPLETE,
'location': 'http://www.uploads.com'},
content='',
request_url='http://www.uploads.com',)

# Create and configure the upload object.
bytes_http = httplib2.Http()
upload = transfer.Upload(
stream=self.sample_stream,
mime_type='text/plain',
total_size=len(self.sample_data),
close_stream=False,
http=bytes_http)

upload.strategy = transfer.RESUMABLE_UPLOAD
# Set the chunk size so the entire stream is uploaded.
upload.chunksize = len(self.sample_data)
# Mock the upload to return the sample response.
with mock.patch.object(bytes_http,
'request') as make_request:
# This side effect also checks the request body.
responses = [
self.response, # Initial request in InitializeUpload().
self.fail_response, # 503 status code from server.
refresh_response, # Refresh upload progress.
self.response, # Successful request.
]
make_request.side_effect = self.HttpRequestSideEffect(responses)

# Initialization.
upload.InitializeUpload(self.request, bytes_http)
upload.StreamInChunks()

# Ensure the mock was called the correct number of times.
self.assertEquals(make_request.call_count, len(responses))

def testStreamInChunks(self):
"""Test StreamInChunks."""
resume_incomplete_responses = [http_wrapper.Response(
info={'status': http_wrapper.RESUME_INCOMPLETE,
'location': 'http://www.uploads.com',
'range': '0-{}'.format(end)},
content='',
request_url='http://www.uploads.com',) for end in [199, 399, 599]]
responses = [
self.response # Initial request in InitializeUpload().
] + resume_incomplete_responses + [
self.response, # Successful request.
]
# Create and configure the upload object.
bytes_http = httplib2.Http()
upload = transfer.Upload(
stream=self.sample_stream,
mime_type='text/plain',
total_size=len(self.sample_data),
close_stream=False,
http=bytes_http)

upload.strategy = transfer.RESUMABLE_UPLOAD
# Set the chunk size so the entire stream is uploaded.
upload.chunksize = 200
# Mock the upload to return the sample response.
with mock.patch.object(bytes_http,
'request') as make_request:
# This side effect also checks the request body.
make_request.side_effect = self.HttpRequestSideEffect(responses)

# Initialization.
upload.InitializeUpload(self.request, bytes_http)
upload.StreamInChunks()

# Ensure the mock was called the correct number of times.
self.assertEquals(make_request.call_count, len(responses))
1 change: 1 addition & 0 deletions default.pylintrc
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ disable =
cyclic-import,
fixme,
import-error,
inconsistent-return-statements,
locally-disabled,
locally-enabled,
no-member,
Expand Down

0 comments on commit 8f99093

Please sign in to comment.