Skip to content

Commit

Permalink
fix: reconnecting websocket (#1486)
Browse files Browse the repository at this point in the history
* fix: adjust logs in reconnecting websocket

* fix error type

* change to debug

* add exception

* fix test

* update conect and redirect and add tests

* format

* fix test

* fix pyrihgt

* docs and format

* skip for 3.7

* fix for test in python 3.7

* ruff format

* add websockets_proxy as requirement

* websockets proxy docs and requiremetns
  • Loading branch information
pcriadoperez authored Dec 2, 2024
1 parent cbd296b commit 39bbd87
Show file tree
Hide file tree
Showing 9 changed files with 323 additions and 52 deletions.
8 changes: 8 additions & 0 deletions binance/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,14 @@ class BinanceWebsocketUnableToConnect(Exception):
pass


class BinanceWebsocketQueueOverflow(Exception):
"""Raised when the websocket message queue exceeds its maximum size."""
pass

class BinanceWebsocketClosed(Exception):
"""Raised when websocket connection is closed."""
pass

class NotImplementedException(Exception):
def __init__(self, value):
message = f"Not implemented: {value}"
Expand Down
2 changes: 1 addition & 1 deletion binance/ws/keepalive_websocket.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ async def _keepalive_socket(self):
if listen_key != self._path:
self._log.debug("listen key changed: reconnect")
self._path = listen_key
await self._reconnect()
self._reconnect()
else:
self._log.debug("listen key same: keepalive")
if self._keepalive_type == "user":
Expand Down
111 changes: 69 additions & 42 deletions binance/ws/reconnecting_websocket.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,19 @@
proxy_connect = None
try:
from websockets_proxy import Proxy as w_Proxy, proxy_connect as w_proxy_connect

Proxy = w_Proxy
proxy_connect = w_proxy_connect
except ImportError:
pass

import websockets as ws

from binance.exceptions import BinanceWebsocketUnableToConnect
from binance.exceptions import (
BinanceWebsocketClosed,
BinanceWebsocketUnableToConnect,
BinanceWebsocketQueueOverflow,
)
from binance.helpers import get_loop
from binance.ws.constants import WSListenerState

Expand Down Expand Up @@ -89,6 +94,7 @@ async def close(self):
await self.__aexit__(None, None, None)

async def __aexit__(self, exc_type, exc_val, exc_tb):
self._log.debug(f"Closing Websocket {self._url}{self._prefix}{self._path}")
if self._exit_coro:
await self._exit_coro(self._path)
self.ws_state = WSListenerState.EXITING
Expand All @@ -98,7 +104,6 @@ async def __aexit__(self, exc_type, exc_val, exc_tb):
await self._conn.__aexit__(exc_type, exc_val, exc_tb)
self.ws = None
if self._handle_read_loop:
self._log.error("CANCEL read_loop")
await self._kill_read_loop()

async def connect(self):
Expand All @@ -113,22 +118,25 @@ async def connect(self):
# handle https_proxy
if self._https_proxy:
if not Proxy or not proxy_connect:
raise ImportError("websockets_proxy is not installed, please install it to use a websockets proxy (pip install websockets_proxy)")
proxy = Proxy.from_url(self._https_proxy) # type: ignore
self._conn = proxy_connect(ws_url, close_timeout=0.1, proxy=proxy, **self._ws_kwargs) # type: ignore
raise ImportError(
"websockets_proxy is not installed, please install it to use a websockets proxy (pip install websockets_proxy)"
)
proxy = Proxy.from_url(self._https_proxy) # type: ignore
self._conn = proxy_connect(
ws_url, close_timeout=0.1, proxy=proxy, **self._ws_kwargs
) # type: ignore
else:
self._conn = ws.connect(ws_url, close_timeout=0.1, **self._ws_kwargs) # type: ignore

try:
self.ws = await self._conn.__aenter__()
except Exception as e: # noqa
self._log.error(f"Failed to connect to websocket: {e}")
self.ws_state = WSListenerState.INITIALISING
return
self.ws_state = WSListenerState.RECONNECTING
raise e
self.ws_state = WSListenerState.STREAMING
self._reconnects = 0
await self._after_connect()
# To manage the "cannot call recv while another coroutine is already waiting for the next message"
if not self._handle_read_loop:
self._handle_read_loop = self._loop.call_soon_threadsafe(
asyncio.create_task, self._read_loop()
Expand All @@ -150,13 +158,23 @@ def _handle_message(self, evt):
if self._is_binary:
try:
evt = gzip.decompress(evt)
except (ValueError, OSError):
return None
except (ValueError, OSError) as e:
self._log.error(f"Failed to decompress message: {(e)}")
raise
except Exception as e:
self._log.error(f"Unexpected decompression error: {(e)}")
raise
try:
return self.json_loads(evt)
except ValueError:
self._log.debug(f"error parsing evt json:{evt}")
return None
except ValueError as e:
self._log.error(f"JSON Value Error parsing message: Error: {(e)}")
raise
except TypeError as e:
self._log.error(f"JSON Type Error parsing message. Error: {(e)}")
raise
except Exception as e:
self._log.error(f"Unexpected error parsing message. Error: {(e)}")
raise

async def _read_loop(self):
try:
Expand All @@ -174,45 +192,56 @@ async def _read_loop(self):
await asyncio.sleep(0.1)
continue
elif self.ws.state == ws.protocol.State.CLOSED: # type: ignore
await self._reconnect()
self._reconnect()
raise BinanceWebsocketClosed(
"Connection closed. Reconnecting..."
)
elif self.ws_state == WSListenerState.STREAMING:
assert self.ws
res = await asyncio.wait_for(
self.ws.recv(), timeout=self.TIMEOUT
)
res = self._handle_message(res)
self._log.debug(f"Received message: {res}")
if res:
if self._queue.qsize() < self.MAX_QUEUE_SIZE:
await self._queue.put(res)
else:
self._log.debug(
f"Queue overflow {self.MAX_QUEUE_SIZE}. Message not filled"
)
await self._queue.put(
{
"e": "error",
"m": "Queue overflow. Message not filled",
}
raise BinanceWebsocketQueueOverflow(
f"Message queue size {self._queue.qsize()} exceeded maximum {self.MAX_QUEUE_SIZE}"
)
raise BinanceWebsocketUnableToConnect
except asyncio.TimeoutError:
self._log.debug(f"no message in {self.TIMEOUT} seconds")
# _no_message_received_reconnect
except asyncio.CancelledError as e:
self._log.debug(f"cancelled error {e}")
self._log.debug(f"_read_loop cancelled error {e}")
break
except asyncio.IncompleteReadError as e:
self._log.debug(f"incomplete read error ({e})")
except ConnectionClosedError as e:
self._log.debug(f"connection close error ({e})")
except gaierror as e:
self._log.debug(f"DNS Error ({e})")
except BinanceWebsocketUnableToConnect as e:
self._log.debug(f"BinanceWebsocketUnableToConnect ({e})")
except (
asyncio.IncompleteReadError,
gaierror,
ConnectionClosedError,
BinanceWebsocketClosed,
) as e:
# reports errors and continue loop
self._log.error(f"{e.__class__.__name__} ({e})")
await self._queue.put({
"e": "error",
"type": f"{e.__class__.__name__}",
"m": f"{e}",
})
except (
BinanceWebsocketUnableToConnect,
BinanceWebsocketQueueOverflow,
Exception,
) as e:
# reports errors and break the loop
self._log.error(f"Unknown exception ({e})")
await self._queue.put({
"e": "error",
"type": e.__class__.__name__,
"m": f"{e}",
})
break
except Exception as e:
self._log.debug(f"Unknown exception ({e})")
continue
finally:
self._handle_read_loop = None # Signal the coro is stopped
self._reconnects = 0
Expand All @@ -226,11 +255,13 @@ async def _run_reconnect(self):
f"waiting {reconnect_wait}"
)
await asyncio.sleep(reconnect_wait)
await self.connect()
try:
await self.connect()
except Exception as e:
pass
else:
self._log.error(f"Max reconnections {self.MAX_RECONNECTS} reached:")
# Signal the error
await self._queue.put({"e": "error", "m": "Max reconnect retries reached"})
raise BinanceWebsocketUnableToConnect

async def recv(self):
Expand Down Expand Up @@ -262,9 +293,5 @@ async def before_reconnect(self):

self._reconnects += 1

def _no_message_received_reconnect(self):
self._log.debug("No message received, reconnecting")
self.ws_state = WSListenerState.RECONNECTING

async def _reconnect(self):
def _reconnect(self):
self.ws_state = WSListenerState.RECONNECTING
32 changes: 31 additions & 1 deletion docs/overview.rst
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,7 @@ Check out the `requests documentation <http://docs.python-requests.org/en/master

**Proxy Settings**

You can use the Requests Settings method above
You can use the Requests Settings method above. For websockets python 3.8+ is required

.. code:: python
Expand Down Expand Up @@ -246,4 +246,34 @@ For Windows environments
C:\>set HTTP_PROXY=http://10.10.1.10:3128
C:\>set HTTPS_PROXY=http://10.10.1.10:1080
Logging
-------

python-binance uses the Python logging module. You can enable logging to help debug issues and monitor your application.

Basic Logging Setup
~~~~~~~~~~~~~~~~~~

To enable debug logging, add this at the start of your script:

.. code:: python
import logging
logging.basicConfig(level=logging.DEBUG)
Advanced Logging Setup
~~~~~~~~~~~~~~~~~~~~~

For more detailed logging with timestamps and log levels:

.. code:: python
import logging
# Configure logging
logging.basicConfig(
level=logging.DEBUG,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
)
.. image:: https://analytics-pixel.appspot.com/UA-111417213-1/github/python-binance/docs/overview?pixel
5 changes: 4 additions & 1 deletion docs/websockets.rst
Original file line number Diff line number Diff line change
Expand Up @@ -216,12 +216,13 @@ can do this.
Websocket Errors
----------------

If the websocket is disconnected and is unable to reconnect, a message is sent to the callback to indicate this. The format is
If an error occurs, a message is sent to the callback to indicate this. The format is

.. code:: python
{
'e': 'error',
'type': 'BinanceWebsocketUnableToConnect',
'm': 'Max reconnect retries reached'
}
Expand All @@ -232,6 +233,8 @@ If the websocket is disconnected and is unable to reconnect, a message is sent t
else:
# process message normally
Websocket Examples
----------------

`Multiplex Socket <binance.html#binance.websockets.BinanceSocketManager.multiplex_socket>`_
+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Expand Down
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,4 @@ dateparser
pycryptodome
requests
websockets
websockets_proxy; python_version >= '3.8'
Loading

0 comments on commit 39bbd87

Please sign in to comment.