Skip to content

Commit

Permalink
fix: threaded_stream (#1463)
Browse files Browse the repository at this point in the history
* chore: add more test_threaded_stream tests

* lint

* add support for python 3.7

* test fix

* allow for async callbacks

* remove example

* remove prints

* format file

* update readme

* ruff

* fix test import

* add conftest

* fix: fix test for 3.12

* fix test for 312

---------

Co-authored-by: carlosmiei <[email protected]>
Co-authored-by: Pablo <[email protected]>
  • Loading branch information
3 people authored Dec 30, 2024
1 parent d8e2ce1 commit 2e909c9
Show file tree
Hide file tree
Showing 3 changed files with 215 additions and 5 deletions.
20 changes: 15 additions & 5 deletions binance/ws/threaded_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,7 @@ def __init__(
"https_proxy": https_proxy,
}

async def _before_socket_listener_start(self):
...
async def _before_socket_listener_start(self): ...

async def socket_listener(self):
self._client = await AsyncClient.create(loop=self._loop, **self._client_params)
Expand All @@ -55,8 +54,11 @@ async def start_listener(self, socket, path: str, callback):
continue
else:
if not msg:
continue
callback(msg)
continue # Handle both async and sync callbacks
if asyncio.iscoroutinefunction(callback):
await callback(msg)
else:
callback(msg)
del self._socket_running[path]

def run(self):
Expand All @@ -75,6 +77,14 @@ def stop(self):
if not self._running:
return
self._running = False
self._loop.call_soon(asyncio.create_task, self.stop_client())
if self._client and self._loop and not self._loop.is_closed():
try:
future = asyncio.run_coroutine_threadsafe(
self.stop_client(), self._loop
)
future.result(timeout=5) # Add timeout to prevent hanging
except Exception as e:
# Log the error but don't raise it
print(f"Error stopping client: {e}")
for socket_name in self._socket_running.keys():
self._socket_running[socket_name] = False
7 changes: 7 additions & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
import asyncio
import logging

from binance.ws.streams import ThreadedWebsocketManager

proxies = {}
proxy = os.getenv("PROXY")

Expand Down Expand Up @@ -75,6 +77,11 @@ def futuresClientAsync():
def liveClientAsync():
return AsyncClient(api_key, api_secret, https_proxy=proxy, testnet=False)

@pytest.fixture(scope="function")
def manager():
return ThreadedWebsocketManager(
api_key="test_key", api_secret="test_secret", https_proxy=proxy, testnet=True
)

@pytest.fixture(autouse=True, scope="function")
def event_loop():
Expand Down
193 changes: 193 additions & 0 deletions tests/test_threaded_stream.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,193 @@
import pytest
import asyncio
from binance.ws.threaded_stream import ThreadedApiManager
from unittest.mock import Mock

# For Python 3.7 compatibility
try:
from unittest.mock import AsyncMock
except ImportError:
# Create our own AsyncMock for Python 3.7
class AsyncMock(Mock):
async def __call__(self, *args, **kwargs):
return super(AsyncMock, self).__call__(*args, **kwargs)

async def __aenter__(self):
return self

async def __aexit__(self, *args):
return None

async def __aiter__(self):
return self

async def __anext__(self):
raise StopAsyncIteration

@pytest.mark.asyncio
async def test_initialization():
"""Test that manager initializes with correct parameters"""
manager = ThreadedApiManager(
api_key="test_key",
api_secret="test_secret",
tld="com",
testnet=True,
requests_params={"timeout": 10},
session_params={"trust_env": True},
)

assert manager._running is True
assert manager._socket_running == {}
assert manager._client_params == {
"api_key": "test_key",
"api_secret": "test_secret",
"requests_params": {"timeout": 10},
"tld": "com",
"testnet": True,
"session_params": {"trust_env": True},
"https_proxy": None
}


@pytest.mark.asyncio
async def test_start_and_stop_socket(manager):
"""Test starting and stopping a socket"""
socket_name = "test_socket"

# AsyncMock socket creation
mock_socket = AsyncMock()
mock_socket.__aenter__ = AsyncMock(return_value=mock_socket)
mock_socket.__aexit__ = AsyncMock(return_value=None)

# Track number of recv calls
recv_count = 0
async def controlled_recv():
nonlocal recv_count
recv_count += 1
# If we've stopped the socket or read enough times, simulate connection closing
if not manager._socket_running.get(socket_name) or recv_count > 2:
raise websockets.exceptions.ConnectionClosed(None, None)
await asyncio.sleep(0.1)
return '{"e": "value"}'

mock_socket.recv = controlled_recv

# AsyncMock callback
callback = AsyncMock()

# Start socket listener
manager._socket_running[socket_name] = True
listener_task = asyncio.create_task(
manager.start_listener(mock_socket, socket_name, callback)
)

# Give some time for the listener to start and receive a message
await asyncio.sleep(0.2)

# Stop the socket
manager.stop_socket(socket_name)

# Wait for the listener task to complete
try:
await asyncio.wait_for(listener_task, timeout=1.0)
except (asyncio.TimeoutError, websockets.exceptions.ConnectionClosed):
pass # These exceptions are expected during shutdown

assert socket_name not in manager._socket_running


@pytest.mark.asyncio
async def test_socket_listener_timeout(manager):
"""Test socket listener handling timeout"""
socket_name = "test_socket"

# AsyncMock socket that times out every time
mock_socket = AsyncMock()
mock_socket.__aenter__ = AsyncMock(return_value=mock_socket)
mock_socket.__aexit__ = AsyncMock(return_value=None)

async def controlled_recv():
await asyncio.sleep(0.1)
raise asyncio.TimeoutError("Simulated Timeout")

mock_socket.recv = controlled_recv

callback = AsyncMock()

# Start socket listener
manager._socket_running[socket_name] = True
listener_task = asyncio.create_task(
manager.start_listener(mock_socket, socket_name, callback)
)

# Give some time for a few timeout cycles
await asyncio.sleep(0.3)

# Stop the socket
manager.stop_socket(socket_name)

# Wait for the listener to finish
try:
await asyncio.wait_for(listener_task, timeout=1.0)
except (asyncio.TimeoutError, websockets.exceptions.ConnectionClosed):
listener_task.cancel()

# Callback should not have been called (no successful messages)
callback.assert_not_called()
assert socket_name not in manager._socket_running


@pytest.mark.asyncio
async def test_stop_client(manager):
"""Test stopping the client"""
# AsyncMock AsyncClient
mock_client = AsyncMock()
mock_client.close_connection = AsyncMock()
manager._client = mock_client

await manager.stop_client()
mock_client.close_connection.assert_called_once()


@pytest.mark.asyncio
async def test_stop(manager):
"""Test stopping the manager"""
socket_name = "test_socket"
manager._socket_running[socket_name] = True

manager.stop()

assert manager._running is False
assert manager._socket_running[socket_name] is False


@pytest.mark.asyncio
async def test_multiple_sockets(manager):
"""Test managing multiple sockets"""
socket_names = ["socket1", "socket2", "socket3"]

# Start multiple sockets
for name in socket_names:
manager._socket_running[name] = True

# Stop all sockets
manager.stop()

# Verify all sockets are stopped
for name in socket_names:
assert manager._socket_running[name] is False


@pytest.mark.asyncio
async def test_stop_client_when_not_initialized(manager):
"""Test stopping client when it hasn't been initialized"""
manager._client = None
await manager.stop_client() # Should not raise any exception


@pytest.mark.asyncio
async def test_stop_when_already_stopped(manager):
"""Test stopping manager when it's already stopped"""
manager._running = False
manager.stop() # Should not raise any exception or change state
assert manager._running is False

0 comments on commit 2e909c9

Please sign in to comment.