Skip to content

Commit

Permalink
Fix issue where uploading files would hang if a request failed while …
Browse files Browse the repository at this point in the history
…using stream_slice.StreamSlice.
  • Loading branch information
kevinli7 authored and vilasj committed Dec 6, 2017
1 parent 48e438b commit 7ccade5
Show file tree
Hide file tree
Showing 2 changed files with 99 additions and 2 deletions.
13 changes: 12 additions & 1 deletion apitools/base/py/transfer.py
Original file line number Diff line number Diff line change
Expand Up @@ -982,8 +982,13 @@ def __SendMediaBody(self, start, additional_headers=None):
if self.total_size is None:
raise exceptions.TransferInvalidError(
'Total size must be known for SendMediaBody')
# Change body_stream from a stream to a string object. This is
# because httpwrapper.MakeRequest doesn't handle the case where
# request.body is a stream. In the case that the body is a stream,
# if a request has to be retried, then the stream will be exhausted
# and the request will hang.
body_stream = stream_slice.StreamSlice(
self.stream, self.total_size - start)
self.stream, self.total_size - start).read()

This comment has been minimized.

Copy link
@houglum

houglum Dec 8, 2017

Contributor

Isn't the whole point of using a StreamSlice to be able to read a small part of a stream, and not the entire thing all at once? This means calls to an Upload object's StreamMedia() will attempt to read the whole stream into memory which defeats the purpose of using a stream for large files and falls apart if the bytes you're streaming in won't all fit into memory.

FWIW, I found this out from running gsutil's integration tests and seeing that some of our tests started failing because we were reading more bytes than expected when calling StreamMedia on an Upload object, resulting in read() being called on the stream wrapper with a higher number of bytes than expected:
https://github.com/GoogleCloudPlatform/gsutil/blob/8f8a61a7d7b7b5202f3cf1dc5e49b8082669a05d/gslib/daisy_chain_wrapper.py#L217

This comment has been minimized.

Copy link
@kevinli7

kevinli7 Dec 8, 2017

Author Contributor

This was an attempt to fix the case where a request failed and subsequent retries would contain either no data or the wrong data. If we remove the read here that problem. I thought this function was supposed to "Send the entire media stream in a single request.".

Do you think a better solution would have some sort of built in retry feature in StreamSlice that http_wrapper.MakeRequest is aware of? Retry feature being something like setting a bit on the streamslice object that just returns the last read output when read() is called.

This comment has been minimized.

Copy link
@kevinli7

kevinli7 Dec 8, 2017

Author Contributor

I realized I misunderstood the docstring. Let's discuss different solutions to this problem.


request = http_wrapper.Request(url=self.url, http_method='PUT',
body=body_stream)
Expand Down Expand Up @@ -1032,6 +1037,12 @@ def __SendChunk(self, start, additional_headers=None):
else:
end = min(start + self.chunksize, self.total_size)
body_stream = stream_slice.StreamSlice(self.stream, end - start)
# Change body_stream from a stream to a string object. This is
# because httpwrapper.MakeRequest doesn't handle the case where
# request.body is a stream. In the case that the body is a stream,
# if a request has to be retried, then the stream will be exhausted
# and the request will hang.
body_stream = body_stream.read()
# TODO(craigcitro): Think about clearer errors on "no data in
# stream".
request.body = body_stream
Expand Down
88 changes: 87 additions & 1 deletion apitools/base/py/transfer_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
"""Tests for transfer.py."""
import string

import httplib2
import mock
import six
from six.moves import http_client
Expand Down Expand Up @@ -310,7 +311,7 @@ def testMultipartEncoding(self):
self.assertTrue(rewritten_upload_contents.endswith(upload_bytes))


class CompressedUploadTest(unittest2.TestCase):
class UploadTest(unittest2.TestCase):

def setUp(self):
# Sample highly compressible data.
Expand All @@ -329,6 +330,12 @@ def setUp(self):
'location': 'http://www.uploads.com'},
content='',
request_url='http://www.uploads.com',)
# Sample failure response.
self.fail_response = http_wrapper.Response(
info={'status': http_client.SERVICE_UNAVAILABLE,
'location': 'http://www.uploads.com'},
content='',
request_url='http://www.uploads.com',)

def testStreamInChunksCompressed(self):
"""Test that StreamInChunks will handle compression correctly."""
Expand Down Expand Up @@ -477,3 +484,82 @@ def testMediaCompressed(self):
with gzip.GzipFile(fileobj=self.request.body) as f:
original = f.read()
self.assertTrue(self.sample_data in original)

def HttpRequestSideEffect(self, responses=None):
responses = [(response.info, response.content)
for response in responses]

def _side_effect(uri, **kwargs): # pylint: disable=unused-argument
body = kwargs['body']
read_func = getattr(body, 'read', None)
if read_func:
# If the body is a stream, consume the stream.
body = read_func()
self.assertEqual(int(kwargs['headers']['content-length']),
len(body))
return responses.pop(0)
return _side_effect

def testRetryRequestChunks(self):
"""Test that StreamInChunks will retry correctly."""
# Create and configure the upload object.
bytes_http = httplib2.Http()
upload = transfer.Upload(
stream=self.sample_stream,
mime_type='text/plain',
total_size=len(self.sample_data),
close_stream=False,
http=bytes_http)

upload.strategy = transfer.RESUMABLE_UPLOAD
# Set the chunk size so the entire stream is uploaded.
upload.chunksize = len(self.sample_data)
# Mock the upload to return the sample response.
with mock.patch.object(bytes_http,
'request') as make_request:
# This side effect also checks the request body.
responses = [
self.response, # Initial request in InitializeUpload().
self.fail_response, # 503 status code from server.
self.response # Successful request.
]
make_request.side_effect = self.HttpRequestSideEffect(responses)

# Initialization.
upload.InitializeUpload(self.request, bytes_http)
upload.StreamInChunks()

# Ensure the mock was called the correct number of times.
self.assertEquals(make_request.call_count, len(responses))

def testRetryRequestMedia(self):
"""Test that StreamMedia will retry correctly."""
# Create and configure the upload object.
bytes_http = httplib2.Http()
upload = transfer.Upload(
stream=self.sample_stream,
mime_type='text/plain',
total_size=len(self.sample_data),
close_stream=False,
http=bytes_http)

upload.strategy = transfer.RESUMABLE_UPLOAD
# Set the chunk size so the entire stream is uploaded.
upload.chunksize = len(self.sample_data)
# Mock the upload to return the sample response.
with mock.patch.object(bytes_http,
'request') as make_request:
# This side effect also checks the request body.
responses = [
self.response, # Initial request in InitializeUpload().
self.fail_response, # 503 status code from server.
self.response # Successful request.
]
make_request.side_effect = self.HttpRequestSideEffect(responses)

# Initialization.
upload.InitializeUpload(self.request, bytes_http)
upload.StreamMedia()

# Ensure the mock was called the correct number of times..
self.assertEquals(make_request.call_count, len(responses))

0 comments on commit 7ccade5

Please sign in to comment.