From cb64dd6d248e865bd598aaae0ee9f29483ccaf87 Mon Sep 17 00:00:00 2001 From: Thijs Walcarius Date: Wed, 13 Oct 2021 15:11:42 +0200 Subject: [PATCH] Move to asyncio Protocol instead of reader/writer (#11) * Move to asyncio Protocol instead of reader/writer * Fix _on_connection_lost * Update on PR * Pre-commit update * Increased the version * create explicit copy of data bytes Co-authored-by: Thijs Walcarius Co-authored-by: cereal2nd --- examples/load_modules.py | 2 +- requirements.txt | 5 +- setup.py | 8 +- velbusaio/const.py | 27 +++- velbusaio/controller.py | 103 ++++++------ velbusaio/handler.py | 25 +-- velbusaio/helpers.py | 14 -- velbusaio/message.py | 30 +--- velbusaio/messages/cover_down.py | 2 - velbusaio/messages/module_status.py | 1 + velbusaio/messages/restore_dimmer.py | 2 +- velbusaio/messages/set_date.py | 2 - velbusaio/messages/set_daylight_saving.py | 2 - velbusaio/messages/set_dimmer.py | 2 - velbusaio/messages/set_realtime_clock.py | 2 - velbusaio/messages/switch_relay_off.py | 2 - velbusaio/messages/switch_relay_on.py | 2 - velbusaio/module_registry.py | 2 +- velbusaio/parser.py | 142 ----------------- velbusaio/protocol.py | 184 ++++++++++++++++++++++ velbusaio/raw_message.py | 154 ++++++++++++++++++ velbusaio/util.py | 49 ++++++ 22 files changed, 482 insertions(+), 280 deletions(-) delete mode 100644 velbusaio/parser.py create mode 100644 velbusaio/protocol.py create mode 100644 velbusaio/raw_message.py create mode 100644 velbusaio/util.py diff --git a/examples/load_modules.py b/examples/load_modules.py index b3af156..21f6a54 100644 --- a/examples/load_modules.py +++ b/examples/load_modules.py @@ -15,7 +15,7 @@ async def main(): # velbus = Velbus("192.168.1.9:27015") # example via serial device # velbus = Velbus("/dev/ttyAMA0") - velbus = Velbus("tls://192.168.1.9:27015") + velbus = Velbus("192.168.1.254:8445") await velbus.connect() for mod in (velbus.get_modules()).values(): print(mod) diff --git a/requirements.txt b/requirements.txt index 84d276b..c31e090 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1 +1,4 @@ -pyserial-asyncio +backoff>=1.10.0 +backoff>=1.10.0,<1.11 +pyserial==3.5.0 +pyserial-asyncio>=0.5 diff --git a/setup.py b/setup.py index 1182ad7..7600ef8 100644 --- a/setup.py +++ b/setup.py @@ -4,11 +4,15 @@ setup( name="velbus-aio", - version="2021.9.4", + version="2021.10.1", url="https://github.com/Cereal2nd/velbus-aio", license="MIT", author="Maikel Punie", - install_requires=["pyserial-asyncio"], + install_requires=[ + "pyserial==3.5.0", + "pyserial-asyncio>=0.5", + "backoff>=1.10.0", + ], author_email="maikel.punie@gmail.com", packages=PACKAGES, include_package_data=True, diff --git a/velbusaio/const.py b/velbusaio/const.py index dca88ae..6f1969a 100644 --- a/velbusaio/const.py +++ b/velbusaio/const.py @@ -15,17 +15,26 @@ PRIORITY_LOW, PRIORITY_THIRDPARTY, ] -STX: Final = 0x0F -ETX: Final = 0x04 + + +HEADER_LENGTH: Final = 4 # Header: [Start Byte, priority, address, RTR+data length] +TAIL_LENGTH: Final = 2 # Tail: [CRC, End Byte] +MAX_BODY_SIZE: Final = 8 # Maximum amount of data bytes in a packet + +MINIMUM_MESSAGE_SIZE: Final = ( + HEADER_LENGTH + TAIL_LENGTH +) # Smallest possible packet: [Start Byte, priority, address, RTR+data length, CRC, End Byte] +MAXIMUM_MESSAGE_SIZE: Final = MINIMUM_MESSAGE_SIZE + MAX_BODY_SIZE + +START_BYTE: Final = 0x0F +END_BYTE: Final = 0x04 + + LENGTH_MASK: Final = 0x0F -HEADER_LENGTH: Final = 4 # Header: [STX, priority, address, RTR+data length] -MAX_DATA_AMOUNT: Final = 8 # Maximum amount of data bytes in a packet -MIN_PACKET_LENGTH: Final = ( - 6 # Smallest possible packet: [STX, priority, address, RTR+data length, CRC, ETC] -) -MAX_PACKET_LENGTH: Final = MIN_PACKET_LENGTH + MAX_DATA_AMOUNT + RTR: Final = 0x40 NO_RTR: Final = 0x00 + CACHEDIR: Final = ".velbuscache" LOAD_TIMEOUT: Final = 600 @@ -42,3 +51,5 @@ CHANNEL_EDGE_LIT: Final = 97 CHANNEL_MEMO_TEXT: Final = 98 CHANNEL_LIGHT_VALUE: Final = 99 + +SLEEP_TIME = 60 / 1000 diff --git a/velbusaio/controller.py b/velbusaio/controller.py index 2340812..d7a4c1c 100644 --- a/velbusaio/controller.py +++ b/velbusaio/controller.py @@ -16,12 +16,14 @@ from velbusaio.exceptions import VelbusConnectionFailed, VelbusConnectionTerminated from velbusaio.handler import PacketHandler from velbusaio.helpers import get_cache_dir +from velbusaio.message import Message from velbusaio.messages.module_type_request import ModuleTypeRequestMessage from velbusaio.messages.set_date import SetDate from velbusaio.messages.set_daylight_saving import SetDaylightSaving from velbusaio.messages.set_realtime_clock import SetRealtimeClock from velbusaio.module import Module -from velbusaio.parser import VelbusParser +from velbusaio.protocol import VelbusProtocol +from velbusaio.raw_message import RawMessage class Velbus: @@ -31,19 +33,34 @@ class Velbus: def __init__(self, dsn, cache_dir=get_cache_dir()) -> None: self._log = logging.getLogger("velbus") + self._loop = asyncio.get_event_loop() + + self._protocol = VelbusProtocol( + message_received_callback=self._on_message_received, + connection_lost_callback=self._on_connection_lost, + loop=self._loop, + ) + self._closing = False + self._auto_reconnect = True + self._dsn = dsn - self._parser = VelbusParser() self._handler = PacketHandler(self.send, self) - self._writer = None - self._reader = None self._modules = {} self._submodules = [] self._send_queue = asyncio.Queue() - self._tasks = [] self._cache_dir = cache_dir # make sure the cachedir exists pathlib.Path(self._cache_dir).mkdir(parents=True, exist_ok=True) + async def _on_message_received(self, msg: RawMessage): + await self._handler.handle(msg) + + def _on_connection_lost(self, exc: Exception): + """Respond to Protocol connection lost.""" + if self._auto_reconnect and not self._closing: + self._log.debug("Reconnecting to transport") + asyncio.ensure_future(self.connect(), loop=self._loop) + async def add_module( self, addr: str, @@ -118,10 +135,9 @@ def get_channels(self, addr: str) -> None | dict: return None async def stop(self) -> None: - for task in self._tasks: - task.cancel() - self._writer.close() - await self._writer.wait_closed() + self._closing = True + self._auto_reconnect = False + self._protocol.close() async def connect(self, test_connect: bool = False) -> None: """ @@ -131,24 +147,24 @@ async def connect(self, test_connect: bool = False) -> None: if ":" in self._dsn: # tcp/ip combination if self._dsn.startswith("tls://"): - tmp = self._dsn.replace("tls://", "").split(":") + host, port = self._dsn.replace("tls://", "").split(":") ctx = ssl._create_unverified_context() else: - tmp = self._dsn.split(":") + host, port = self._dsn.split(":") ctx = None try: - self._reader, self._writer = await asyncio.open_connection( - tmp[0], tmp[1], ssl=ctx + _transport, _protocol = await self._loop.create_connection( + lambda: self._protocol, host=host, port=port, ssl=ctx ) + except ConnectionRefusedError as err: raise VelbusConnectionFailed() from err else: # serial port try: - ( - self._reader, - self._writer, - ) = await serial_asyncio.open_serial_connection( + _transport, _protocol = await serial_asyncio.create_serial_connection( + self._loop, + lambda: self._protocol, url=self._dsn, baudrate=38400, bytesize=serial.EIGHTBITS, @@ -161,10 +177,7 @@ async def connect(self, test_connect: bool = False) -> None: raise VelbusConnectionFailed() from err if test_connect: return - # create reader, parser and writer tasks - self._tasks.append(asyncio.Task(self._socket_read_task())) - self._tasks.append(asyncio.Task(self._socket_send_task())) - self._tasks.append(asyncio.Task(self._parser_task())) + # scan the bus await self.scan() @@ -173,7 +186,7 @@ async def scan(self) -> None: for addr in range(1, 255): msg = ModuleTypeRequestMessage(addr) await self.send(msg) - await asyncio.sleep(30) + await asyncio.sleep(15) self._handler.scan_finished() # calculate how long to wait calc_timeout = len(self._modules) * 30 @@ -199,50 +212,26 @@ async def _check_if_modules_are_loaded(self) -> None: for mod in (self.get_modules()).values(): if mod.is_loaded(): mods_loaded += 1 + else: + self._log.warning(f"Waiting for module {mod._address}") if mods_loaded == len(self.get_modules()): self._log.info("All modules loaded") return self._log.info("Not all modules loaded yet, waiting 30 seconds") await asyncio.sleep(230) - async def send(self, msg) -> None: + async def send(self, msg: Message) -> None: """ Send a packet """ - await self._send_queue.put(msg) - - async def _socket_send_task(self) -> None: - """ - Task to send the packet from the queue to the bus - """ - while self._send_queue: - msg = await self._send_queue.get() - self._log.debug(f"SENDING message: {msg}") - # print(':'.join('{:02X}'.format(x) for x in msg.to_binary())) - try: - self._writer.write(msg.to_binary()) - except Exception: - raise VelbusConnectionTerminated() - await asyncio.sleep(0.11) - - async def _socket_read_task(self) -> None: - """ - Task to read from a socket and push into a queue - """ - while True: - try: - data = await self._reader.read(10) - except Exception: - raise VelbusConnectionTerminated() - self._parser.feed(data) - - async def _parser_task(self) -> None: - """ - Task to parser the received queue - """ - while True: - packet = await self._parser.wait_for_packet() - await self._handler.handle(packet) + await self._protocol.send_message( + RawMessage( + priority=msg.priority, + address=msg.address, + rtr=msg.rtr, + data=msg.data_to_binary(), + ) + ) def get_all(self, class_name: str) -> list: lst = [] diff --git a/velbusaio/handler.py b/velbusaio/handler.py index a16d4d1..12d44e1 100644 --- a/velbusaio/handler.py +++ b/velbusaio/handler.py @@ -16,6 +16,7 @@ from velbusaio.message import Message from velbusaio.messages.module_subtype import ModuleSubTypeMessage from velbusaio.messages.module_type import ModuleTypeMessage +from velbusaio.raw_message import RawMessage class PacketHandler: @@ -39,27 +40,29 @@ def scan_finished(self) -> None: def scan_started(self) -> None: self._scan_complete = False - async def handle(self, data: str) -> None: + async def handle(self, rawmsg: RawMessage) -> None: """ Handle a received packet """ - priority = data[1] - address = int(data[2]) - rtr = data[3] & RTR == RTR - data_size = data[3] & 0x0F - command_value = data[4] - if data_size < 1: + if rawmsg.address < 1 or rawmsg.address > 254: return - if address < 1 or address > 254: + if not rawmsg.command: return + + priority = rawmsg.priority + address = rawmsg.address + rtr = rawmsg.rtr + command_value = rawmsg.command + data = rawmsg.data_only + if command_value == 0xFF and not self._scan_complete: msg = ModuleTypeMessage() - msg.populate(priority, address, rtr, data[5:-2]) + msg.populate(priority, address, rtr, data) self._log.debug(f"Received {msg}") await self._handle_module_type(msg) elif command_value == 0xB0 and not self._scan_complete: msg = ModuleSubTypeMessage() - msg.populate(priority, address, rtr, data[5:-2]) + msg.populate(priority, address, rtr, data) self._log.debug(f"Received {msg}") await self._handle_module_subtype(msg) elif command_value in self.pdata["MessagesBroadCast"]: @@ -73,7 +76,7 @@ async def handle(self, data: str) -> None: if commandRegistry.has_command(int(command_value), module_type): command = commandRegistry.get_command(command_value, module_type) msg = command() - msg.populate(priority, address, rtr, data[5:-2]) + msg.populate(priority, address, rtr, data) self._log.debug(f"Received {msg}") # send the message to the modules await (self._velbus.get_module(msg.address)).on_message(msg) diff --git a/velbusaio/helpers.py b/velbusaio/helpers.py index d1a3088..524ccdb 100644 --- a/velbusaio/helpers.py +++ b/velbusaio/helpers.py @@ -27,20 +27,6 @@ def keys_exists(element, *keys) -> dict: return _element -def checksum(arr) -> int: - """ - Calculate checksum of the given array. - The checksum is calculated by summing all values in an array, then performing the two's complement. - :param arr: The array of bytes of which the checksum has to be calculated of. - :return: The checksum of the given array. - """ - crc = sum(arr) - crc = crc ^ 255 - crc = crc + 1 - crc = crc & 255 - return crc - - def h2(inp) -> str: """ Format as hex upercase diff --git a/velbusaio/message.py b/velbusaio/message.py index e2c4238..aef2ce4 100644 --- a/velbusaio/message.py +++ b/velbusaio/message.py @@ -5,15 +5,7 @@ import json -from velbusaio.const import ( - ETX, - PRIORITY_FIRMWARE, - PRIORITY_HIGH, - PRIORITY_LOW, - RTR, - STX, -) -from velbusaio.helpers import checksum +from velbusaio.const import PRIORITY_FIRMWARE, PRIORITY_HIGH, PRIORITY_LOW, RTR class ParserError(Exception): @@ -68,24 +60,6 @@ def set_address(self, address: int) -> None: """ self.address = address - def to_binary(self): - """ - :return: bytes - """ - data_bytes = self.data_to_binary() - if self.rtr: - rtr_and_size = RTR | len(data_bytes) - else: - rtr_and_size = len(data_bytes) - header = bytearray([STX, self.priority, self.address, rtr_and_size]) - checksum_string = checksum(header + data_bytes) - return ( - header - + data_bytes - + bytearray.fromhex(f"{checksum_string:02x}") - + bytearray([ETX]) - ) - def data_to_binary(self): """ :return: bytes @@ -101,7 +75,7 @@ def to_json_basic(self): return { "name": self.__class__.__name__, "priority": self.priority, - "address": self.address, + "address": f"{self.address:x}", "rtr": self.rtr, } diff --git a/velbusaio/messages/cover_down.py b/velbusaio/messages/cover_down.py index 4a08795..939e701 100644 --- a/velbusaio/messages/cover_down.py +++ b/velbusaio/messages/cover_down.py @@ -4,7 +4,6 @@ from __future__ import annotations import json -import logging import struct from velbusaio.command_registry import register_command @@ -71,7 +70,6 @@ def __init__(self, address=None): Message.__init__(self) self.channel = 0 self.delay_time = 0 - self.logger = logging.getLogger("velbus") self.set_defaults(address) def populate(self, priority, address, rtr, data): diff --git a/velbusaio/messages/module_status.py b/velbusaio/messages/module_status.py index c66d344..70ed03d 100644 --- a/velbusaio/messages/module_status.py +++ b/velbusaio/messages/module_status.py @@ -160,3 +160,4 @@ def data_to_binary(self): register_command(COMMAND_CODE, ModuleStatusMessage2, "VMBDME") register_command(COMMAND_CODE, ModuleStatusMessage2, "VMB1RYS") register_command(COMMAND_CODE, ModuleStatusPirMessage, "VMBIRO") +register_command(COMMAND_CODE, ModuleStatusPirMessage, "VMBPIRM") diff --git a/velbusaio/messages/restore_dimmer.py b/velbusaio/messages/restore_dimmer.py index 405297e..7dbd912 100644 --- a/velbusaio/messages/restore_dimmer.py +++ b/velbusaio/messages/restore_dimmer.py @@ -38,7 +38,7 @@ def populate(self, priority, address, rtr, data): self.set_attributes(priority, address, rtr) self.dimmer_channels = self.byte_to_channels(data[0]) self.dimmer_transitiontime = int.from_bytes( - data[[2, 3]], byteorder="big", signed=False + data[2:3], byteorder="big", signed=False ) def to_json(self): diff --git a/velbusaio/messages/set_date.py b/velbusaio/messages/set_date.py index 1c2a72c..23c0572 100644 --- a/velbusaio/messages/set_date.py +++ b/velbusaio/messages/set_date.py @@ -4,7 +4,6 @@ from __future__ import annotations import json -import logging import time from velbusaio.command_registry import register_command @@ -20,7 +19,6 @@ class SetDate(Message): def __init__(self, address=0x00): Message.__init__(self) - self.logger = logging.getLogger("velbus") self._day = None self._mon = None self._year = None diff --git a/velbusaio/messages/set_daylight_saving.py b/velbusaio/messages/set_daylight_saving.py index b6c07a6..744adaa 100644 --- a/velbusaio/messages/set_daylight_saving.py +++ b/velbusaio/messages/set_daylight_saving.py @@ -4,7 +4,6 @@ from __future__ import annotations import json -import logging import time from velbusaio.command_registry import register_command @@ -20,7 +19,6 @@ class SetDaylightSaving(Message): def __init__(self, address=0x00): Message.__init__(self) - self.logger = logging.getLogger("velbus") self._ds = None self.set_defaults(address) diff --git a/velbusaio/messages/set_dimmer.py b/velbusaio/messages/set_dimmer.py index 4023eda..4bf9809 100644 --- a/velbusaio/messages/set_dimmer.py +++ b/velbusaio/messages/set_dimmer.py @@ -4,7 +4,6 @@ from __future__ import annotations import json -import logging from velbusaio.command_registry import register_command from velbusaio.message import Message @@ -21,7 +20,6 @@ class SetDimmerMessage(Message): def __init__(self, address=None): Message.__init__(self) self.dimmer_channels = [] - self.logger = logging.getLogger("velbus") self.dimmer_state = 0 self.dimmer_transitiontime = 0 self.set_defaults(address) diff --git a/velbusaio/messages/set_realtime_clock.py b/velbusaio/messages/set_realtime_clock.py index cc7f085..89c6c52 100644 --- a/velbusaio/messages/set_realtime_clock.py +++ b/velbusaio/messages/set_realtime_clock.py @@ -4,7 +4,6 @@ from __future__ import annotations import json -import logging import time from velbusaio.command_registry import register_command @@ -20,7 +19,6 @@ class SetRealtimeClock(Message): def __init__(self, address=0x00): Message.__init__(self) - self.logger = logging.getLogger("velbus") self._wday = None self._hour = None self._min = None diff --git a/velbusaio/messages/switch_relay_off.py b/velbusaio/messages/switch_relay_off.py index 0fe3a18..d17e6a0 100644 --- a/velbusaio/messages/switch_relay_off.py +++ b/velbusaio/messages/switch_relay_off.py @@ -4,7 +4,6 @@ from __future__ import annotations import json -import logging from velbusaio.command_registry import register_command from velbusaio.message import Message @@ -21,7 +20,6 @@ class SwitchRelayOffMessage(Message): def __init__(self, address=None): Message.__init__(self) self.relay_channels = [] - self.logger = logging.getLogger("velbus") self.set_defaults(address) def populate(self, priority, address, rtr, data): diff --git a/velbusaio/messages/switch_relay_on.py b/velbusaio/messages/switch_relay_on.py index a67d0bc..ec797c6 100644 --- a/velbusaio/messages/switch_relay_on.py +++ b/velbusaio/messages/switch_relay_on.py @@ -4,7 +4,6 @@ from __future__ import annotations import json -import logging from velbusaio.command_registry import register_command from velbusaio.message import Message @@ -21,7 +20,6 @@ class SwitchRelayOnMessage(Message): def __init__(self, address=None): Message.__init__(self) self.relay_channels = [] - self.logger = logging.getLogger("velbus") self.set_defaults(address) def set_defaults(self, address): diff --git a/velbusaio/module_registry.py b/velbusaio/module_registry.py index 04bcdab..0a8d554 100644 --- a/velbusaio/module_registry.py +++ b/velbusaio/module_registry.py @@ -38,7 +38,7 @@ 0x22: "VMB7IN", 0x28: "VMBGPOD", 0x29: "VMB1RYNOS", - 0x2A: "VMBIRM", + 0x2A: "VMBPIRM", 0x2B: "VMBIRC", 0x2C: "VMBIRO", 0x2D: "VMBGP4PIR", diff --git a/velbusaio/parser.py b/velbusaio/parser.py deleted file mode 100644 index 9fb655d..0000000 --- a/velbusaio/parser.py +++ /dev/null @@ -1,142 +0,0 @@ -""" -:author: Maikel Punie -""" -from __future__ import annotations - -import asyncio -import itertools -import logging -from collections import deque - -from velbusaio.const import ( - ETX, - HEADER_LENGTH, - LENGTH_MASK, - MAX_DATA_AMOUNT, - MIN_PACKET_LENGTH, - PRIORITIES, - STX, -) -from velbusaio.helpers import checksum - - -class VelbusParser: - """ - Transform Velbus message from wire format to Message object - """ - - def __init__(self): - super().__init__() - self.logger = logging.getLogger("velbus-parser") - self.buffer = deque(maxlen=10000) - - def feed(self, data): - """ - Feed received data in the buffer - """ - self.buffer.extend(bytearray(data)) - - # async def _next(self): - # packet = None - # has_valid_packet = self._has_valid_packet_waiting() - # while not has_valid_packet: - # if len(self.buffer) > HEADER_LENGTH and self.__has_packet_length_waiting(): - # self.__realign_buffer() - # has_valid_packet = self._has_valid_packet_waiting() - # await asyncio.sleep(1) - # - # if has_valid_packet: - # packet = self._extract_packet() - # return packet - - async def wait_for_packet(self): - """ - Wait for a valid apcket - """ - while not self._has_valid_packet_waiting(): - await asyncio.sleep(0.1) - return self._extract_packet() - - def _has_valid_packet_waiting(self): - """ - Checks whether or not the parser has a valid packet in its buffer. - :return: A boolean indicating whether or not the parser has a valid packet in its buffer. - TODO Fix - """ - if not self.__has_valid_header_waiting(): - return False - if len(self.buffer) < MIN_PACKET_LENGTH: - return False - return self.__has_packet_length_waiting() or False - # bytes_to_check = bytearray( - # itertools.islice(self.buffer, 0, 4 + self.__curr_packet_body_length()) - # ) - # checksum_valid = self.buffer[(self.__curr_packet_length() - 2)] == checksum( - # bytes_to_check - # ) - # end_valid = self.buffer[(self.__curr_packet_length() - 1)] == ETX - # return checksum_valid and end_valid - - def __has_valid_header_waiting(self): - """ - Checks whether or not the parser has a valid packet header waiting. - :return: A boolean indicating whether or not the parser has a valid packet header waiting. - """ - if len(self.buffer) < HEADER_LENGTH: - return False - start_valid = self.buffer[0] == STX - bodysize_valid = self.__curr_packet_body_length() <= MAX_DATA_AMOUNT - priority_valid = self.buffer[1] in PRIORITIES - return start_valid and bodysize_valid and priority_valid - - def __has_packet_length_waiting(self): - """ - Checks whether the current packet has the full length's worth of data waiting in the buffer. - This should only be called when __has_valid_header_waiting() returns True. - """ - return len(self.buffer) >= self.__curr_packet_length() - - def __curr_packet_length(self): - """ - Gets the current waiting packet's total length. - This should only be called when __has_valid_header_waiting() returns True. - :return: The current waiting packet's total length. - """ - return MIN_PACKET_LENGTH + self.__curr_packet_body_length() - - def __curr_packet_body_length(self): - """ - Gets the current waiting packet's body length. - This should only be called when __has_valid_header_waiting() returns True. - :return: The current waiting packet's body length. - """ - return self.buffer[3] & LENGTH_MASK - - def _extract_packet(self): - """ - Extracts a packet from the buffer and shifts it. - Make sure this is only called after __has_valid_packet_waiting() return True. - :return: A bytearray with the currently waiting packet. - """ - length = self.__curr_packet_length() - packet = bytearray(itertools.islice(self.buffer, 0, length)) - self.__shift_buffer(length) - return packet - - def __realign_buffer(self): - """ - Realigns buffer by shifting the queue until the next STX or until the buffer runs out. - """ - amount = 1 - while amount < len(self.buffer) and self.buffer[amount] != STX: - amount += 1 - - self.__shift_buffer(amount) - - def __shift_buffer(self, amount): - """ - Shifts the buffer by the specified amount. - :param amount: The amount of bytes that the buffer needs to be shifted. - """ - for _ in itertools.repeat(None, amount): - self.buffer.popleft() diff --git a/velbusaio/protocol.py b/velbusaio/protocol.py new file mode 100644 index 0000000..ec998d1 --- /dev/null +++ b/velbusaio/protocol.py @@ -0,0 +1,184 @@ +import asyncio +import binascii +import logging +import typing as t +from asyncio import transports + +import backoff + +from velbusaio.const import MAXIMUM_MESSAGE_SIZE, MINIMUM_MESSAGE_SIZE, SLEEP_TIME +from velbusaio.messages.module_type_request import ModuleTypeRequestMessage +from velbusaio.raw_message import RawMessage +from velbusaio.raw_message import create as create_message_info + + +def _on_write_backoff(details): + logging.debug( + "Transport is not open, waiting {wait} seconds after {tries}", + wait=details.wait, + tries=details.tries, + ) + + +class VelbusProtocol(asyncio.BufferedProtocol): + """Handles the Velbus protocol + + This class is expected to be wrapped inside a VelbusConnection class object which will maintain the socket + and handle auto-reconnects""" + + def __init__( + self, + loop, + message_received_callback: t.Callable[[RawMessage], None], + connection_lost_callback=None, + ) -> None: + super().__init__() + self._log = logging.getLogger("velbus-protocol") + self._loop = loop + self._message_received_callback = message_received_callback + self._connection_lost_callback = connection_lost_callback + + # everything for reading from Velbus + self._buffer = bytearray(MAXIMUM_MESSAGE_SIZE) + self._buffer_view = memoryview(self._buffer) + self._buffer_pos = 0 + + self.transport = None + + # everything for writing to Velbus + self._send_queue = asyncio.Queue(loop=self._loop) + self._write_transport_lock = asyncio.Lock(loop=self._loop) + self._writer_task = None + self._restart_writer = False + self.restart_writing() + + self._closing = False + + def connection_made(self, transport: transports.BaseTransport) -> None: + self.transport = transport + self._log.info("Connection established to Velbus") + + self._restart_writer = True + self.restart_writing() + + async def pause_writing(self): + """Pause writing.""" + self._restart_writer = False + if self._writer_task: + self._send_queue.put_nowait(None) + await asyncio.sleep(0.1) + + def restart_writing(self): + """Resume writing.""" + if self._restart_writer and not self._write_transport_lock.locked(): + self._writer_task = asyncio.ensure_future( + self._get_message_from_send_queue(), loop=self._loop + ) + self._writer_task.add_done_callback(lambda _future: self.restart_writing()) + + def close(self): + self._closing = True + self._restart_writer = False + if self.transport: + self.transport.close() + + def connection_lost(self, exc: t.Optional[Exception]) -> None: + self.transport = None + + if self._closing: + return # Connection loss was expected, nothing to do here... + elif exc is None: + self._log.warning("EOF received from Velbus") + else: + self._log.error(f"Velbus connection lost: {exc!r}") + + self.transport = None + asyncio.ensure_future(self.pause_writing(), loop=self._loop) + if self._connection_lost_callback: + self._connection_lost_callback(exc) + + # Everything read-related + + def get_buffer(self, sizehint): + return self._buffer_view[self._buffer_pos :] + + def buffer_updated(self, nbytes: int) -> None: + """Receive data from the protocol. + Called when asyncio.BufferedProtocol detects received data from network. + """ + self._buffer_pos += nbytes + self._log.debug( + "Received {nbytes} bytes from Velbus: {data_hex}".format( + nbytes=nbytes, + data_hex=binascii.hexlify( + self._buffer[self._buffer_pos - nbytes : self._buffer_pos], " " + ), + ) + ) + + if self._buffer_pos > MINIMUM_MESSAGE_SIZE: + # try to construct a Velbus message from the buffer + msg, remaining_data = create_message_info(self._buffer) + + if msg: + asyncio.ensure_future(self._process_message(msg), loop=self._loop) + + self._new_buffer(remaining_data) + + def _new_buffer(self, remaining_data=None): + new_buffer = bytearray(MAXIMUM_MESSAGE_SIZE) + if remaining_data: + new_buffer[: len(remaining_data)] = remaining_data + + self._buffer = new_buffer + self._buffer_pos = len(remaining_data) if remaining_data else 0 + self._buffer_view = memoryview(self._buffer) + + async def _process_message(self, msg: RawMessage): + self._log.debug(f"RX: {msg}") + await self._message_received_callback(msg) + + # Everything write-related + + async def send_message(self, msg: RawMessage): + self._send_queue.put_nowait(msg) + + async def _get_message_from_send_queue(self): + self._log.debug("Starting Velbus write message from send queue") + self._log.debug("Acquiring write lock") + await self._write_transport_lock.acquire() + while self._restart_writer: + # wait for an item from the queue + msg_info = await self._send_queue.get() + if msg_info is None: + self._restart_writer = False + return + message_sent = False + try: + while not message_sent: + message_sent = await self._write_message(msg_info) + await asyncio.sleep(SLEEP_TIME, loop=self._loop) + except (asyncio.CancelledError, GeneratorExit) as exc: + if not self._closing: + self._log.error(f"Stopping Velbus writer due to {exc!r}") + self._restart_writer = False + except Exception as exc: + self._log.error(f"Restarting Velbus writer due to {exc!r}") + self._restart_writer = True + if self._write_transport_lock.locked(): + self._write_transport_lock.release() + self._log.debug("Ending Velbus write message from send queue") + + @backoff.on_predicate( + backoff.expo, + lambda is_sent: not is_sent, + max_tries=10, + on_backoff=_on_write_backoff, + ) + async def _write_message(self, msg: RawMessage): + self._log.debug(f"TX: {msg}") + if not self.transport.is_closing(): + self.transport.write(msg.to_bytes()) + return True + else: + return False diff --git a/velbusaio/raw_message.py b/velbusaio/raw_message.py new file mode 100644 index 0000000..9861c6b --- /dev/null +++ b/velbusaio/raw_message.py @@ -0,0 +1,154 @@ +import binascii +import logging +from typing import NamedTuple, Optional, Tuple + +from velbusaio.const import ( + END_BYTE, + HEADER_LENGTH, + MAXIMUM_MESSAGE_SIZE, + MINIMUM_MESSAGE_SIZE, + NO_RTR, + PRIORITIES, + RTR, + START_BYTE, + TAIL_LENGTH, +) +from velbusaio.util import checksum +from velbusaio.util import checksum as calculate_checksum + + +class RawMessage(NamedTuple): + priority: int + address: int + rtr: bool + data: bytes + + @property + def command(self): + return self.data[0] if len(self.data) > 0 else None + + @property + def data_only(self): + return self.data[1:] if len(self.data) > 1 else None + + def to_bytes(self) -> bytes: + """ + :return: bytes + """ + + # create header: + header_bytes = bytes( + [ + START_BYTE, + self.priority, + self.address, + (RTR if self.rtr else NO_RTR) | len(self.data), + ] + ) + + tail_bytes = bytes([checksum(header_bytes + self.data), END_BYTE]) + + return header_bytes + self.data + tail_bytes + + def __repr__(self) -> str: + return ( + f"RawMessage(priority={self.priority:02x}, address={self.address:02x}," + f" rtr={self.rtr!r}, command={self.command}," + f" data={binascii.hexlify(self.data, ' ')})" + ) + + +def create(rawmessage: bytearray) -> Tuple[Optional[RawMessage], bytearray]: + rawmessage = _trim_buffer_garbage(rawmessage) + + while True: + if len(rawmessage) < MINIMUM_MESSAGE_SIZE: + logging.debug("Buffer does not yet contain a full message") + return None, rawmessage + + try: + logging.debug(f"Receive: {rawmessage}") + return _parse(rawmessage) + except ParseError: + logging.exception( + f"Could not parse the message {binascii.hexlify(rawmessage)}. Truncating invalid data." + ) + rawmessage = _trim_buffer_garbage( + rawmessage[1:] + ) # try to find possible start of a message + + +class ParseError(Exception): + pass + + +def _parse(rawmessage: bytearray) -> Tuple[Optional[RawMessage], bytearray]: + assert ( + MINIMUM_MESSAGE_SIZE <= len(rawmessage) <= MAXIMUM_MESSAGE_SIZE + ), "Received a raw message with an illegal length" + assert rawmessage[0] == START_BYTE + + priority = rawmessage[1] + if priority not in PRIORITIES: + raise ParseError( + f"Invalid priority byte: {priority:02x} in {binascii.hexlify(rawmessage)}" + ) + + address = rawmessage[2] + + rtr = rawmessage[3] & RTR == RTR # high nibble of the 4th byte + data_size = rawmessage[3] & 0x0F # low nibble of the 4th byte + + if HEADER_LENGTH + data_size + TAIL_LENGTH > len(rawmessage): + return ( + None, + rawmessage, + ) # the full package is not available in the current buffer + + if rawmessage[HEADER_LENGTH + data_size + 1] != END_BYTE: + raise ParseError(f"Invalid end byte in {binascii.hexlify(rawmessage)}") + + checksum = rawmessage[HEADER_LENGTH + data_size] + + calculated_checksum = calculate_checksum(rawmessage[: HEADER_LENGTH + data_size]) + + if calculated_checksum != checksum: + raise ParseError( + f"Invalid checksum: expected {calculated_checksum:02x}," + f" but got {checksum:02x} in {binascii.hexlify(rawmessage)}" + ) + + data = bytes(rawmessage[HEADER_LENGTH : HEADER_LENGTH + data_size]) + + return ( + RawMessage(priority, address, rtr, data), + rawmessage[HEADER_LENGTH + data_size + TAIL_LENGTH :], + ) + + +def _trim_buffer_garbage(rawmessage): + """ + Remove leading garbage bytes from a byte stream. + """ + + # A proper message byte stream begins with 0x0F. + if rawmessage and rawmessage[0] != START_BYTE: + start_index = rawmessage.find(START_BYTE) + if start_index > -1: + logging.debug( + "Trimming leading garbage from buffer content: {buffer} becomes {new_buffer}".format( + buffer=binascii.hexlify(rawmessage), + new_buffer=binascii.hexlify(rawmessage[start_index:]), + ) + ) + return rawmessage[start_index:] + else: + logging.debug( + "Trimming whole buffer as it does not contain the start byte: {buffer}".format( + buffer=binascii.hexlify(rawmessage) + ) + ) + return [] + + else: + return rawmessage diff --git a/velbusaio/util.py b/velbusaio/util.py new file mode 100644 index 0000000..ee7c166 --- /dev/null +++ b/velbusaio/util.py @@ -0,0 +1,49 @@ +from velbusaio.const import MAXIMUM_MESSAGE_SIZE, MINIMUM_MESSAGE_SIZE + + +# Copyright (c) 2017 Thomas Delaet +# Copied from python-velbus (https://github.com/thomasdelaet/python-velbus) +def checksum(data) -> int: + """ + :return: int + """ + assert len(data) >= MINIMUM_MESSAGE_SIZE - 2 + assert len(data) <= MAXIMUM_MESSAGE_SIZE - 2 + __checksum = 0 + for data_byte in data: + __checksum += data_byte + __checksum = -(__checksum % 256) + 256 + return __checksum % 256 + + +class VelbusException(Exception): + """Velbus Exception.""" + + def __init__(self, value): + Exception.__init__(self) + self.value = value + + def __str__(self): + return repr(self.value) + + +class MessageParseException(Exception): + pass + + +class BitSet: + def __init__(self, value): + self._value = value + + def __getitem__(self, idx): + assert 0 <= idx < 8 + return bool((1 << idx) & self._value) + + def __setitem__(self, idx, value): + assert 0 <= idx < 8 + assert isinstance(value, bool) + mask = (0xFF ^ (1 << idx)) & self._value + self._value = mask & (value << idx) + + def __len__(self): + return 8 # a bitset represents one byte