Skip to content

Commit

Permalink
Move to asyncio Protocol instead of reader/writer (#11)
Browse files Browse the repository at this point in the history
* Move to asyncio Protocol instead of reader/writer

* Fix _on_connection_lost

* Update on PR

* Pre-commit update

* Increased the version

* create explicit copy of data bytes

Co-authored-by: Thijs Walcarius <[email protected]>
Co-authored-by: cereal2nd <[email protected]>
  • Loading branch information
3 people authored Oct 13, 2021
1 parent 0a5cfec commit cb64dd6
Show file tree
Hide file tree
Showing 22 changed files with 482 additions and 280 deletions.
2 changes: 1 addition & 1 deletion examples/load_modules.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ async def main():
# velbus = Velbus("192.168.1.9:27015")
# example via serial device
# velbus = Velbus("/dev/ttyAMA0")
velbus = Velbus("tls://192.168.1.9:27015")
velbus = Velbus("192.168.1.254:8445")
await velbus.connect()
for mod in (velbus.get_modules()).values():
print(mod)
Expand Down
5 changes: 4 additions & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
@@ -1 +1,4 @@
pyserial-asyncio
backoff>=1.10.0
backoff>=1.10.0,<1.11
pyserial==3.5.0
pyserial-asyncio>=0.5
8 changes: 6 additions & 2 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,15 @@

setup(
name="velbus-aio",
version="2021.9.4",
version="2021.10.1",
url="https://github.com/Cereal2nd/velbus-aio",
license="MIT",
author="Maikel Punie",
install_requires=["pyserial-asyncio"],
install_requires=[
"pyserial==3.5.0",
"pyserial-asyncio>=0.5",
"backoff>=1.10.0",
],
author_email="[email protected]",
packages=PACKAGES,
include_package_data=True,
Expand Down
27 changes: 19 additions & 8 deletions velbusaio/const.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,26 @@
PRIORITY_LOW,
PRIORITY_THIRDPARTY,
]
STX: Final = 0x0F
ETX: Final = 0x04


HEADER_LENGTH: Final = 4 # Header: [Start Byte, priority, address, RTR+data length]
TAIL_LENGTH: Final = 2 # Tail: [CRC, End Byte]
MAX_BODY_SIZE: Final = 8 # Maximum amount of data bytes in a packet

MINIMUM_MESSAGE_SIZE: Final = (
HEADER_LENGTH + TAIL_LENGTH
) # Smallest possible packet: [Start Byte, priority, address, RTR+data length, CRC, End Byte]
MAXIMUM_MESSAGE_SIZE: Final = MINIMUM_MESSAGE_SIZE + MAX_BODY_SIZE

START_BYTE: Final = 0x0F
END_BYTE: Final = 0x04


LENGTH_MASK: Final = 0x0F
HEADER_LENGTH: Final = 4 # Header: [STX, priority, address, RTR+data length]
MAX_DATA_AMOUNT: Final = 8 # Maximum amount of data bytes in a packet
MIN_PACKET_LENGTH: Final = (
6 # Smallest possible packet: [STX, priority, address, RTR+data length, CRC, ETC]
)
MAX_PACKET_LENGTH: Final = MIN_PACKET_LENGTH + MAX_DATA_AMOUNT

RTR: Final = 0x40
NO_RTR: Final = 0x00

CACHEDIR: Final = ".velbuscache"
LOAD_TIMEOUT: Final = 600

Expand All @@ -42,3 +51,5 @@
CHANNEL_EDGE_LIT: Final = 97
CHANNEL_MEMO_TEXT: Final = 98
CHANNEL_LIGHT_VALUE: Final = 99

SLEEP_TIME = 60 / 1000
103 changes: 46 additions & 57 deletions velbusaio/controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,14 @@
from velbusaio.exceptions import VelbusConnectionFailed, VelbusConnectionTerminated
from velbusaio.handler import PacketHandler
from velbusaio.helpers import get_cache_dir
from velbusaio.message import Message
from velbusaio.messages.module_type_request import ModuleTypeRequestMessage
from velbusaio.messages.set_date import SetDate
from velbusaio.messages.set_daylight_saving import SetDaylightSaving
from velbusaio.messages.set_realtime_clock import SetRealtimeClock
from velbusaio.module import Module
from velbusaio.parser import VelbusParser
from velbusaio.protocol import VelbusProtocol
from velbusaio.raw_message import RawMessage


class Velbus:
Expand All @@ -31,19 +33,34 @@ class Velbus:

def __init__(self, dsn, cache_dir=get_cache_dir()) -> None:
self._log = logging.getLogger("velbus")
self._loop = asyncio.get_event_loop()

self._protocol = VelbusProtocol(
message_received_callback=self._on_message_received,
connection_lost_callback=self._on_connection_lost,
loop=self._loop,
)
self._closing = False
self._auto_reconnect = True

self._dsn = dsn
self._parser = VelbusParser()
self._handler = PacketHandler(self.send, self)
self._writer = None
self._reader = None
self._modules = {}
self._submodules = []
self._send_queue = asyncio.Queue()
self._tasks = []
self._cache_dir = cache_dir
# make sure the cachedir exists
pathlib.Path(self._cache_dir).mkdir(parents=True, exist_ok=True)

async def _on_message_received(self, msg: RawMessage):
await self._handler.handle(msg)

def _on_connection_lost(self, exc: Exception):
"""Respond to Protocol connection lost."""
if self._auto_reconnect and not self._closing:
self._log.debug("Reconnecting to transport")
asyncio.ensure_future(self.connect(), loop=self._loop)

async def add_module(
self,
addr: str,
Expand Down Expand Up @@ -118,10 +135,9 @@ def get_channels(self, addr: str) -> None | dict:
return None

async def stop(self) -> None:
for task in self._tasks:
task.cancel()
self._writer.close()
await self._writer.wait_closed()
self._closing = True
self._auto_reconnect = False
self._protocol.close()

async def connect(self, test_connect: bool = False) -> None:
"""
Expand All @@ -131,24 +147,24 @@ async def connect(self, test_connect: bool = False) -> None:
if ":" in self._dsn:
# tcp/ip combination
if self._dsn.startswith("tls://"):
tmp = self._dsn.replace("tls://", "").split(":")
host, port = self._dsn.replace("tls://", "").split(":")
ctx = ssl._create_unverified_context()
else:
tmp = self._dsn.split(":")
host, port = self._dsn.split(":")
ctx = None
try:
self._reader, self._writer = await asyncio.open_connection(
tmp[0], tmp[1], ssl=ctx
_transport, _protocol = await self._loop.create_connection(
lambda: self._protocol, host=host, port=port, ssl=ctx
)

except ConnectionRefusedError as err:
raise VelbusConnectionFailed() from err
else:
# serial port
try:
(
self._reader,
self._writer,
) = await serial_asyncio.open_serial_connection(
_transport, _protocol = await serial_asyncio.create_serial_connection(
self._loop,
lambda: self._protocol,
url=self._dsn,
baudrate=38400,
bytesize=serial.EIGHTBITS,
Expand All @@ -161,10 +177,7 @@ async def connect(self, test_connect: bool = False) -> None:
raise VelbusConnectionFailed() from err
if test_connect:
return
# create reader, parser and writer tasks
self._tasks.append(asyncio.Task(self._socket_read_task()))
self._tasks.append(asyncio.Task(self._socket_send_task()))
self._tasks.append(asyncio.Task(self._parser_task()))

# scan the bus
await self.scan()

Expand All @@ -173,7 +186,7 @@ async def scan(self) -> None:
for addr in range(1, 255):
msg = ModuleTypeRequestMessage(addr)
await self.send(msg)
await asyncio.sleep(30)
await asyncio.sleep(15)
self._handler.scan_finished()
# calculate how long to wait
calc_timeout = len(self._modules) * 30
Expand All @@ -199,50 +212,26 @@ async def _check_if_modules_are_loaded(self) -> None:
for mod in (self.get_modules()).values():
if mod.is_loaded():
mods_loaded += 1
else:
self._log.warning(f"Waiting for module {mod._address}")
if mods_loaded == len(self.get_modules()):
self._log.info("All modules loaded")
return
self._log.info("Not all modules loaded yet, waiting 30 seconds")
await asyncio.sleep(230)

async def send(self, msg) -> None:
async def send(self, msg: Message) -> None:
"""
Send a packet
"""
await self._send_queue.put(msg)

async def _socket_send_task(self) -> None:
"""
Task to send the packet from the queue to the bus
"""
while self._send_queue:
msg = await self._send_queue.get()
self._log.debug(f"SENDING message: {msg}")
# print(':'.join('{:02X}'.format(x) for x in msg.to_binary()))
try:
self._writer.write(msg.to_binary())
except Exception:
raise VelbusConnectionTerminated()
await asyncio.sleep(0.11)

async def _socket_read_task(self) -> None:
"""
Task to read from a socket and push into a queue
"""
while True:
try:
data = await self._reader.read(10)
except Exception:
raise VelbusConnectionTerminated()
self._parser.feed(data)

async def _parser_task(self) -> None:
"""
Task to parser the received queue
"""
while True:
packet = await self._parser.wait_for_packet()
await self._handler.handle(packet)
await self._protocol.send_message(
RawMessage(
priority=msg.priority,
address=msg.address,
rtr=msg.rtr,
data=msg.data_to_binary(),
)
)

def get_all(self, class_name: str) -> list:
lst = []
Expand Down
25 changes: 14 additions & 11 deletions velbusaio/handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
from velbusaio.message import Message
from velbusaio.messages.module_subtype import ModuleSubTypeMessage
from velbusaio.messages.module_type import ModuleTypeMessage
from velbusaio.raw_message import RawMessage


class PacketHandler:
Expand All @@ -39,27 +40,29 @@ def scan_finished(self) -> None:
def scan_started(self) -> None:
self._scan_complete = False

async def handle(self, data: str) -> None:
async def handle(self, rawmsg: RawMessage) -> None:
"""
Handle a received packet
"""
priority = data[1]
address = int(data[2])
rtr = data[3] & RTR == RTR
data_size = data[3] & 0x0F
command_value = data[4]
if data_size < 1:
if rawmsg.address < 1 or rawmsg.address > 254:
return
if address < 1 or address > 254:
if not rawmsg.command:
return

priority = rawmsg.priority
address = rawmsg.address
rtr = rawmsg.rtr
command_value = rawmsg.command
data = rawmsg.data_only

if command_value == 0xFF and not self._scan_complete:
msg = ModuleTypeMessage()
msg.populate(priority, address, rtr, data[5:-2])
msg.populate(priority, address, rtr, data)
self._log.debug(f"Received {msg}")
await self._handle_module_type(msg)
elif command_value == 0xB0 and not self._scan_complete:
msg = ModuleSubTypeMessage()
msg.populate(priority, address, rtr, data[5:-2])
msg.populate(priority, address, rtr, data)
self._log.debug(f"Received {msg}")
await self._handle_module_subtype(msg)
elif command_value in self.pdata["MessagesBroadCast"]:
Expand All @@ -73,7 +76,7 @@ async def handle(self, data: str) -> None:
if commandRegistry.has_command(int(command_value), module_type):
command = commandRegistry.get_command(command_value, module_type)
msg = command()
msg.populate(priority, address, rtr, data[5:-2])
msg.populate(priority, address, rtr, data)
self._log.debug(f"Received {msg}")
# send the message to the modules
await (self._velbus.get_module(msg.address)).on_message(msg)
Expand Down
14 changes: 0 additions & 14 deletions velbusaio/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,20 +27,6 @@ def keys_exists(element, *keys) -> dict:
return _element


def checksum(arr) -> int:
"""
Calculate checksum of the given array.
The checksum is calculated by summing all values in an array, then performing the two's complement.
:param arr: The array of bytes of which the checksum has to be calculated of.
:return: The checksum of the given array.
"""
crc = sum(arr)
crc = crc ^ 255
crc = crc + 1
crc = crc & 255
return crc


def h2(inp) -> str:
"""
Format as hex upercase
Expand Down
30 changes: 2 additions & 28 deletions velbusaio/message.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,7 @@

import json

from velbusaio.const import (
ETX,
PRIORITY_FIRMWARE,
PRIORITY_HIGH,
PRIORITY_LOW,
RTR,
STX,
)
from velbusaio.helpers import checksum
from velbusaio.const import PRIORITY_FIRMWARE, PRIORITY_HIGH, PRIORITY_LOW, RTR


class ParserError(Exception):
Expand Down Expand Up @@ -68,24 +60,6 @@ def set_address(self, address: int) -> None:
"""
self.address = address

def to_binary(self):
"""
:return: bytes
"""
data_bytes = self.data_to_binary()
if self.rtr:
rtr_and_size = RTR | len(data_bytes)
else:
rtr_and_size = len(data_bytes)
header = bytearray([STX, self.priority, self.address, rtr_and_size])
checksum_string = checksum(header + data_bytes)
return (
header
+ data_bytes
+ bytearray.fromhex(f"{checksum_string:02x}")
+ bytearray([ETX])
)

def data_to_binary(self):
"""
:return: bytes
Expand All @@ -101,7 +75,7 @@ def to_json_basic(self):
return {
"name": self.__class__.__name__,
"priority": self.priority,
"address": self.address,
"address": f"{self.address:x}",
"rtr": self.rtr,
}

Expand Down
Loading

0 comments on commit cb64dd6

Please sign in to comment.