Skip to content

Commit

Permalink
ML-2527: Fix HTTP(S) client connection corruption (#77)
Browse files Browse the repository at this point in the history
* ML-2527: Fix HTTP(S) client connection corruption

* Improve error handling and reduce log level of error

Co-authored-by: Assaf Ben-Amitai <[email protected]>
  • Loading branch information
gtopper and assaf758 committed Sep 4, 2022
1 parent df4be62 commit db7e190
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 32 deletions.
23 changes: 18 additions & 5 deletions v3io/dataplane/response.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,15 +29,28 @@ def __init__(self, output, status_code, headers, body):
self.status_code = status_code
self.body = body
self.headers = headers
self.output = None
self._output = output
self._parsed_output = None

if output and self.body:
@property
def output(self):
if self._parsed_output:
return self._parsed_output

if self._output and self.body:
try:
parsed_output = ujson.loads(self.body)
# TODO: It's expensive to always try to parse as JSON first. Better use headers or a heuristic to decide the format.
try:
parsed_output = ujson.loads(self.body)
except Exception:
parsed_output = xml.etree.ElementTree.fromstring(self.body)
except Exception:
parsed_output = xml.etree.ElementTree.fromstring(self.body)
raise HttpResponseError(f"Failed to parse response with status {self.status_code}, "
f"body {self.body}, headers={self.headers}")

self._parsed_output = self._output(parsed_output)

self.output = output(parsed_output)
return self._parsed_output

def raise_for_status(self, expected_statuses=None):
if expected_statuses == v3io.dataplane.transport.RaiseForStatus.never:
Expand Down
57 changes: 30 additions & 27 deletions v3io/dataplane/transport/httpclient.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@ def __init__(self, logger, endpoint=None, max_connections=None, timeout=None, ve

# create the pool connection
self._create_connections(self.max_connections,
self._host,
self._ssl_context)
self._host,
self._ssl_context)

# python 2 and 3 have different exceptions
if sys.version_info[0] >= 3:
Expand All @@ -57,13 +57,11 @@ def send_request(self, request):
# TODO: consider getting param of whether we should block or not (wait for connection to be free or raise exception)
connection = self._free_connections.get(block=True, timeout=None)

# set the used connection on the request
setattr(request.transport, 'connection_used', connection)

# get a connection for the request and send it
try:
return self._send_request_on_connection(request, connection)
except BaseException as e:
connection.close()
connection = self._create_connection(self._host, self._ssl_context)
self._free_connections.put(connection, block=True)
raise e

Expand Down Expand Up @@ -95,37 +93,40 @@ def wait_response(self, request, raise_for_status=None, num_retries=1):
# return the response
return response

except self._wait_response_exceptions as e:
except v3io.dataplane.response.HttpResponseError as response_error:
self._logger.warn_with('Response error: {}'.format(str(response_error)))
raise response_error
except BaseException as e:
if num_retries == 0:
self._logger.warn_with('Remote disconnected while waiting for response and ran out of retries',
e=type(e),
connection=connection)
self._logger.error_with('Remote disconnected while waiting for response and ran out of retries',
e=type(e),
e_msg=e,
response_body=response_body,
status_code=status_code,
headers=headers,
connection=connection)

raise e

self._logger.debug_with('Remote disconnected while waiting for response',
retries_left=num_retries,
e=type(e),
e_msg=e,
connection=connection)

num_retries -= 1

# make sure connections is closed (connection.connect is called automaticly when connection is closed)
connection.close()
connection = self._create_connection(self._host, self._ssl_context)

# re-send the request on the connection
request = self._send_request_on_connection(request, connection)
except v3io.dataplane.response.HttpResponseError as response_error:
self._logger.warn_with('Response error: {}'.format(str(response_error)))
raise response_error
except BaseException as e:
self._logger.warn_with('Unhandled exception while waiting for response',
e=type(e),
connection=connection)
raise e
finally:
self._free_connections.put(connection, block=True)

def _send_request_on_connection(self, request, connection):
setattr(request.transport, 'connection_used', connection)

path = request.encode_path()

self.log('Tx',
Expand All @@ -136,15 +137,17 @@ def _send_request_on_connection(self, request, connection):
body=request.body)

try:
connection.request(request.method, path, request.body, request.headers)
except self._send_request_exceptions as e:
self._logger.debug_with('Disconnected while attempting to send. Recreating connection', e=type(e))

# re-request (connection.connect is called automaticly when connection is closed)
connection.close()
connection.request(request.method, path, request.body, request.headers)
try:
connection.request(request.method, path, request.body, request.headers)
except self._send_request_exceptions as e:
self._logger.debug_with('Disconnected while attempting to send. Recreating connection and retrying',
e=type(e), e_msg=e, connection=connection)
connection.close()
connection = self._create_connection(self._host, self._ssl_context)
request.transport.connection_used = connection
connection.request(request.method, path, request.body, request.headers)
except BaseException as e:
self._logger.warn_with('Unhandled exception while sending request', e=type(e))
self._logger.error_with('Unhandled exception while sending request', e=type(e), e_msg=e, connection=connection)
raise e

return request
Expand Down

0 comments on commit db7e190

Please sign in to comment.