diff --git a/plexwebsocket.py b/plexwebsocket.py index 9febd73..b1ba003 100644 --- a/plexwebsocket.py +++ b/plexwebsocket.py @@ -1,5 +1,6 @@ """Support for issuing callbacks for Plex client updates via websockets.""" import asyncio +from concurrent.futures._base import CancelledError from datetime import datetime import logging @@ -7,6 +8,11 @@ _LOGGER = logging.getLogger(__name__) +STATE_CONNECTED = "connected" +STATE_DISCONNECTED = "disconnected" +STATE_STARTING = "starting" +STATE_STOPPED = "stopped" + class WebsocketPlayer: # pylint: disable=too-few-public-methods """Represent an individual player state in the Plex websocket stream.""" @@ -33,6 +39,8 @@ def significant_position_change(self, timestamp, new_position): class PlexWebsocket: """Represent a websocket connection to a Plex server.""" + # pylint: disable=too-many-instance-attributes + def __init__(self, plex_server, callback, session=None, verify_ssl=True): """Initialize a PlexWebsocket instance. @@ -51,9 +59,20 @@ def __init__(self, plex_server, callback, session=None, verify_ssl=True): self.uri = self._get_uri(plex_server) self.players = {} self.callback = callback - self._active = True - self._current_task = None self._ssl = False if verify_ssl is False else None + self._state = None + self.failed_attempts = 0 + + @property + def state(self): + """Return the current state.""" + return self._state + + @state.setter + def state(self, value): + """Set the state.""" + self._state = value + _LOGGER.debug("Websocket %s", value) @staticmethod def _get_uri(plex_server): @@ -62,43 +81,61 @@ def _get_uri(plex_server): "/:/websockets/notifications", includeToken=True ).replace("http", "ws") - async def listen(self): + async def running(self): """Open a persistent websocket connection and act on events.""" - self._active = True - failed_attempts = 0 - while self._active: - try: - async with self.session.ws_connect( - self.uri, heartbeat=15, ssl=self._ssl - ) as ws_client: - failed_attempts = 0 - self._current_task = asyncio.Task.current_task() - _LOGGER.debug("Websocket connected") - self.callback() - - async for message in ws_client: + self.state = STATE_STARTING + + try: + async with self.session.ws_connect( + self.uri, heartbeat=15, ssl=self._ssl + ) as ws_client: + self.state = STATE_CONNECTED + self.failed_attempts = 0 + self.callback() + + async for message in ws_client: + if self.state == STATE_STOPPED: + break + + if message.type == aiohttp.WSMsgType.TEXT: msg = message.json()["NotificationContainer"] if self.player_event(msg): self.callback() - except aiohttp.client_exceptions.ClientConnectionError as error: - retry_delay = min(2 ** (failed_attempts - 1) * 30, 300) - failed_attempts += 1 + elif message.type == aiohttp.WSMsgType.CLOSED: + _LOGGER.warning("AIOHTTP websocket connection closed") + break + + elif message.type == aiohttp.WSMsgType.ERROR: + _LOGGER.error("AIOHTTP websocket error") + break + + except aiohttp.ClientConnectionError as error: + if self.state != STATE_STOPPED: + retry_delay = min(2 ** (self.failed_attempts - 1) * 30, 300) + self.failed_attempts += 1 _LOGGER.error( "Websocket connection failed, retrying in %ds: %s", retry_delay, error, ) + self.state = STATE_DISCONNECTED await asyncio.sleep(retry_delay) - except Exception as error: # pylint: disable=broad-except + except CancelledError: + _LOGGER.debug("Websocket future cancelled") + self.state = STATE_STOPPED + except Exception as error: # pylint: disable=broad-except + if self.state != STATE_STOPPED: _LOGGER.exception("Unexpected exception occurred: %s", error) + self.state = STATE_DISCONNECTED await asyncio.sleep(10) - else: - _LOGGER.error("Websocket disconnected") - if self._active: - # Session IDs reset if Plex server has restarted, be safe - self.players.clear() - await asyncio.sleep(5) + else: + if self.state != STATE_STOPPED: + self.state = STATE_DISCONNECTED + + # Session IDs reset if Plex server has restarted, be safe + self.players.clear() + await asyncio.sleep(5) def player_event(self, msg): """Determine if messages relate to an interesting player event.""" @@ -155,8 +192,12 @@ def player_event(self, msg): return should_fire + async def listen(self): + """Close the listening websocket.""" + self.failed_attempts = 0 + while self.state != STATE_STOPPED: + await self.running() + def close(self): """Close the listening websocket.""" - self._active = False - if self._current_task is not None: - self._current_task.cancel() + self.state = STATE_STOPPED diff --git a/setup.py b/setup.py index ad26c73..50f6a88 100644 --- a/setup.py +++ b/setup.py @@ -3,7 +3,7 @@ with open('README.md') as f: long_description = f.read() -VERSION="0.0.8" +VERSION="0.0.9" setup( name='plexwebsocket',