Skip to content

Commit

Permalink
Work on stabalizing the connection
Browse files Browse the repository at this point in the history
  • Loading branch information
cereal2nd committed Sep 20, 2024
1 parent 124859c commit d109b36
Showing 1 changed file with 34 additions and 12 deletions.
46 changes: 34 additions & 12 deletions duotecno/controller.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
"""Main interface to the duotecno bus."""

from __future__ import annotations
import asyncio
import logging
import time
from typing import Final
from collections import deque
from duotecno.exceptions import LoadFailure, InvalidPassword
from duotecno.protocol import (
Expand All @@ -14,6 +17,14 @@
from duotecno.node import Node
from duotecno.unit import BaseUnit

PW_TIMEOUT: Final = 5
LOAD_NODE_TIMEOUT: Final = 60
LOAD_UNIT_TIMEOUT: Final = 120
HB_TIMEOUT: Final = 20
HB_BUSEMPTY: Final = 10
MAX_INFLIGHT: Final = 5
STATUS_RETRANSMIT: Final = 2


class PyDuotecno:
"""Class that will will do the bus management.
Expand All @@ -34,6 +45,7 @@ class PyDuotecno:
sendQueue: asyncio.PriorityQueue
connectionOK: asyncio.Event
heartbeatReceived: asyncio.Event
nextHeartbeat: int
packetToWaitFor: str | None = None
packetWaiter: asyncio.Event
nodes: dict[int, Node] = {}
Expand Down Expand Up @@ -64,10 +76,11 @@ async def disconnect(self) -> None:
self.connectionOK.clear()

self.readerTask.cancel()
self.hbTask.cancel()
self.workTask.cancel()
self.writerTask.cancel()
if self.writer:
self.writer.close()
self._log.debug("Waiting to finish disconnecting")
await self.writer.wait_closed()
self._log.debug("Disconnecting Finished")

async def connect(
Expand Down Expand Up @@ -107,16 +120,16 @@ async def _do_connect(self, testOnly: bool = False, skipLoad: bool = False) -> N
# start the bus reading task
self.receiveQueue = asyncio.PriorityQueue()
self.sendQueue = asyncio.PriorityQueue()
self.sendSema = asyncio.Semaphore(5)
self.readerTask = asyncio.Task(self.readTask())
self.sendSema = asyncio.Semaphore(MAX_INFLIGHT)
self.readerTask = asyncio.Task(self._readTask())
self.writerTask = asyncio.Task(self._writeTask())
self.workTask = asyncio.Task(self.handleTask())
self.workTask = asyncio.Task(self._handleTask())
# send login info
passw = [str(ord(i)) for i in self.password]
await self.write(f"[214,3,{len(passw)},{','.join(passw)}]")
# wait for the login to be ok
try:
await asyncio.wait_for(self.waitForPacket("67,3,1"), timeout=5.0)
await asyncio.wait_for(self.waitForPacket("67,3,1"), timeout=PW_TIMEOUT)
except TimeoutError:
await self.disconnect()
raise InvalidPassword()
Expand All @@ -128,12 +141,12 @@ async def _do_connect(self, testOnly: bool = False, skipLoad: bool = False) -> N
await self.write("[209,5]")
await self.write("[209,0]")
try:
await asyncio.wait_for(self._loadTaskNodes(), timeout=60.0)
await asyncio.wait_for(self._loadTaskNodes(), timeout=LOAD_NODE_TIMEOUT)
self._log.info("Nodes discoverd")
for n in self.nodes.values():
await n.load()
await asyncio.sleep(0.1)
await asyncio.wait_for(self._loadTaskUnits(), timeout=120.0)
await asyncio.wait_for(self._loadTaskUnits(), timeout=LOAD_UNIT_TIMEOUT)
self._log.info("Units discoverd")
except TimeoutError:
await self.disconnect()
Expand Down Expand Up @@ -220,22 +233,30 @@ async def heartbeatTask(self) -> None:
await asyncio.sleep(30)
self._log.info("Starting HB task")
while True:
# wait until the timer expire of 5 seconds
while self.nextHeartbeat > int(time.time()):
self._log.debug(
f"Waiting until {self.nextHeartbeat} to send a HB ({time.time()})"
)
await asyncio.sleep(1)
# send the heartbeat
self.heartbeatReceived.clear()
try:
self._log.debug("Sending heartbeat message")
await self.write("[215,1]")
await asyncio.wait_for(self.heartbeatReceived.wait(), timeout=10.0)
await asyncio.wait_for(
self.heartbeatReceived.wait(), timeout=HB_TIMEOUT
)
self._log.debug("Received heartbeat message")
except TimeoutError:
self._log.warning("Timeout on heartbeat, reconnecting")
await self._reconnect()
break
except asyncio.exceptions.CancelledError:
break
await asyncio.sleep(10)
self._log.info("Stopping HB task")

async def readTask(self) -> None:
async def _readTask(self) -> None:
"""Reader task."""
while self.connectionOK.is_set() and self.reader:
try:
Expand All @@ -256,6 +277,7 @@ async def readTask(self) -> None:
if tmp == "":
return
self._log.debug(f'RX: "{tmp}"')
self.nextHeartbeat = int(time.time()) + HB_BUSEMPTY
self.sendSema.release()
if await self._comparePacket(tmp):
p = tmp.split(",")
Expand Down Expand Up @@ -287,7 +309,7 @@ async def waitForPacket(self, pstr: str) -> None:
await self.packetWaiter.wait()
self.packetToWaitFor = None

async def handleTask(self) -> None:
async def _handleTask(self) -> None:
"""handler task."""
while self.connectionOK.is_set() and self.receiveQueue:
try:
Expand Down

0 comments on commit d109b36

Please sign in to comment.