From db7e190f44c4e3d4623051eb98b51a58f11635ff Mon Sep 17 00:00:00 2001 From: Gal Topper Date: Mon, 5 Sep 2022 05:09:55 +0800 Subject: [PATCH] ML-2527: Fix HTTP(S) client connection corruption (#77) * ML-2527: Fix HTTP(S) client connection corruption * Improve error handling and reduce log level of error Co-authored-by: Assaf Ben-Amitai --- v3io/dataplane/response.py | 23 ++++++++--- v3io/dataplane/transport/httpclient.py | 57 ++++++++++++++------------ 2 files changed, 48 insertions(+), 32 deletions(-) diff --git a/v3io/dataplane/response.py b/v3io/dataplane/response.py index 7c37480..cbf51fc 100644 --- a/v3io/dataplane/response.py +++ b/v3io/dataplane/response.py @@ -29,15 +29,28 @@ def __init__(self, output, status_code, headers, body): self.status_code = status_code self.body = body self.headers = headers - self.output = None + self._output = output + self._parsed_output = None - if output and self.body: + @property + def output(self): + if self._parsed_output: + return self._parsed_output + + if self._output and self.body: try: - parsed_output = ujson.loads(self.body) + # TODO: It's expensive to always try to parse as JSON first. Better use headers or a heuristic to decide the format. + try: + parsed_output = ujson.loads(self.body) + except Exception: + parsed_output = xml.etree.ElementTree.fromstring(self.body) except Exception: - parsed_output = xml.etree.ElementTree.fromstring(self.body) + raise HttpResponseError(f"Failed to parse response with status {self.status_code}, " + f"body {self.body}, headers={self.headers}") + + self._parsed_output = self._output(parsed_output) - self.output = output(parsed_output) + return self._parsed_output def raise_for_status(self, expected_statuses=None): if expected_statuses == v3io.dataplane.transport.RaiseForStatus.never: diff --git a/v3io/dataplane/transport/httpclient.py b/v3io/dataplane/transport/httpclient.py index 57b4023..2715eae 100644 --- a/v3io/dataplane/transport/httpclient.py +++ b/v3io/dataplane/transport/httpclient.py @@ -35,8 +35,8 @@ def __init__(self, logger, endpoint=None, max_connections=None, timeout=None, ve # create the pool connection self._create_connections(self.max_connections, - self._host, - self._ssl_context) + self._host, + self._ssl_context) # python 2 and 3 have different exceptions if sys.version_info[0] >= 3: @@ -57,13 +57,11 @@ def send_request(self, request): # TODO: consider getting param of whether we should block or not (wait for connection to be free or raise exception) connection = self._free_connections.get(block=True, timeout=None) - # set the used connection on the request - setattr(request.transport, 'connection_used', connection) - - # get a connection for the request and send it try: return self._send_request_on_connection(request, connection) except BaseException as e: + connection.close() + connection = self._create_connection(self._host, self._ssl_context) self._free_connections.put(connection, block=True) raise e @@ -95,37 +93,40 @@ def wait_response(self, request, raise_for_status=None, num_retries=1): # return the response return response - except self._wait_response_exceptions as e: + except v3io.dataplane.response.HttpResponseError as response_error: + self._logger.warn_with('Response error: {}'.format(str(response_error))) + raise response_error + except BaseException as e: if num_retries == 0: - self._logger.warn_with('Remote disconnected while waiting for response and ran out of retries', - e=type(e), - connection=connection) + self._logger.error_with('Remote disconnected while waiting for response and ran out of retries', + e=type(e), + e_msg=e, + response_body=response_body, + status_code=status_code, + headers=headers, + connection=connection) raise e self._logger.debug_with('Remote disconnected while waiting for response', retries_left=num_retries, + e=type(e), + e_msg=e, connection=connection) num_retries -= 1 - # make sure connections is closed (connection.connect is called automaticly when connection is closed) connection.close() + connection = self._create_connection(self._host, self._ssl_context) # re-send the request on the connection request = self._send_request_on_connection(request, connection) - except v3io.dataplane.response.HttpResponseError as response_error: - self._logger.warn_with('Response error: {}'.format(str(response_error))) - raise response_error - except BaseException as e: - self._logger.warn_with('Unhandled exception while waiting for response', - e=type(e), - connection=connection) - raise e finally: self._free_connections.put(connection, block=True) def _send_request_on_connection(self, request, connection): + setattr(request.transport, 'connection_used', connection) + path = request.encode_path() self.log('Tx', @@ -136,15 +137,17 @@ def _send_request_on_connection(self, request, connection): body=request.body) try: - connection.request(request.method, path, request.body, request.headers) - except self._send_request_exceptions as e: - self._logger.debug_with('Disconnected while attempting to send. Recreating connection', e=type(e)) - - # re-request (connection.connect is called automaticly when connection is closed) - connection.close() - connection.request(request.method, path, request.body, request.headers) + try: + connection.request(request.method, path, request.body, request.headers) + except self._send_request_exceptions as e: + self._logger.debug_with('Disconnected while attempting to send. Recreating connection and retrying', + e=type(e), e_msg=e, connection=connection) + connection.close() + connection = self._create_connection(self._host, self._ssl_context) + request.transport.connection_used = connection + connection.request(request.method, path, request.body, request.headers) except BaseException as e: - self._logger.warn_with('Unhandled exception while sending request', e=type(e)) + self._logger.error_with('Unhandled exception while sending request', e=type(e), e_msg=e, connection=connection) raise e return request