Skip to content

Commit

Permalink
fix: typing hints
Browse files Browse the repository at this point in the history
  • Loading branch information
rpoisel committed Aug 1, 2023
1 parent ecbda54 commit 2f2ad75
Showing 1 changed file with 114 additions and 91 deletions.
205 changes: 114 additions & 91 deletions riden/riden.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
# Built-in modules
from datetime import datetime
from typing import Optional, Union

# Third-party modules
from modbus_tk import hooks
Expand All @@ -22,8 +23,8 @@ def __init__(
port: str = "/dev/ttyUSB0",
baudrate: int = 115200,
address: int = 1,
serial: Serial = None,
master: RtuMaster = None,
serial: Optional[Serial] = None,
master: Optional[RtuMaster] = None,
close_after_call: bool = False,
timeout: float = 0.5,
):
Expand Down Expand Up @@ -74,49 +75,59 @@ def __init__(

self.update()

def read(self, register: int, length: int = 1) -> int or tuple:
def read(self, register: int, length) -> tuple:
try:
response = self.master.execute(
self.address, READ_HOLDING_REGISTERS, register, length
)
return response if length > 1 else response[0]
return response
except ModbusInvalidResponseError:
return self.read(register, length)

def write(self, register: int, value: int) -> int:
def read_single(self, register: int) -> int:
response = self.read(register, 1)
response_len = len(response)
if response_len > 1:
raise ValueError(f'unexpected response of length {response_len}')
return response[0]

def write_single(self, register: int, value: int) -> int:
try:
return self.master.execute(
self.address, WRITE_SINGLE_REGISTER, register, 1, value
)[0]
except ModbusInvalidResponseError:
return self.write(register, value)
return self.write_single(register, value)

def write_multiple(self, register: int, values: tuple or list) -> tuple:
def write_multiple(self, register: int, values: Union[list,
tuple]) -> tuple:
try:
return self.master.execute(
self.address, WRITE_MULTIPLE_REGISTERS, register, 1, values
self.address, WRITE_MULTIPLE_REGISTERS, register, 1, values # type: ignore
)
except ModbusInvalidResponseError:
return self.write_multiple(register, values)

def init(self) -> None:
data = self.read(R.ID, R.FW + 1)
self.get_id(data[R.ID]),
self.get_sn(data[R.SN_H], data[R.SN_L]),
self.get_fw(data[R.FW]),
self.get_id(data[R.ID]), # type: ignore
self.get_sn(data[R.SN_H], data[R.SN_L]), # type: ignore
self.get_fw(data[R.FW]), # type: ignore

def get_id(self, _id: int = None) -> int:
self.id = _id or self.read(R.ID)
def get_id(self, _id: Optional[int] = None) -> Optional[int]:
self.id = _id or self.read_single(R.ID)
return self.id

def get_sn(self, _sn_h: int = None, _sn_l: int = None) -> str:
_sn_h = _sn_h or self.read(R.SN_H)
_sn_l = _sn_l or self.read(R.SN_L)
def get_sn(self,
_sn_h: Optional[int] = None,
_sn_l: Optional[int] = None) -> str:
_sn_h = _sn_h or self.read_single(R.SN_H)
_sn_l = _sn_l or self.read_single(R.SN_L)
self.sn = "%08d" % (_sn_h << 16 | _sn_l)
return self.sn

def get_fw(self, _fw: int = None) -> int:
self.fw = _fw or self.read(R.FW)
def get_fw(self, _fw: Optional[int] = None) -> int:
self.fw = _fw or self.read_single(R.FW)
return self.fw

def update(self) -> None:
Expand Down Expand Up @@ -149,123 +160,135 @@ def update(self) -> None:
self.get_ah(data[R.AH_H], data[R.AH_L])
self.get_wh(data[R.WH_H], data[R.WH_L])

def get_int_c(self, _int_c_s: int = None, _int_c: int = None) -> int:
_int_c_s = _int_c_s or self.read(R.INT_C_S)
_int_c = _int_c or self.read(R.INT_C)
def get_int_c(self,
_int_c_s: Optional[int] = None,
_int_c: Optional[int] = None) -> int:
_int_c_s = _int_c_s or self.read_single(R.INT_C_S)
_int_c = _int_c or self.read_single(R.INT_C)
sign = -1 if _int_c_s else +1
self.int_c = _int_c * sign
return self.int_c

def get_int_f(self, _int_f_s: int = None, _int_f: int = None) -> int:
_int_f_s = _int_f_s or self.read(R.INT_F_S)
_int_f = _int_f or self.read(R.INT_F)
def get_int_f(self,
_int_f_s: Optional[int] = None,
_int_f: Optional[int] = None) -> int:
_int_f_s = _int_f_s or self.read_single(R.INT_F_S)
_int_f = _int_f or self.read_single(R.INT_F)
sign = -1 if _int_f_s else +1
self.int_f = _int_f * sign
return self.int_f

def get_v_set(self, _v_set: int = None) -> float:
_v_set = _v_set or self.read(R.V_SET)
def get_v_set(self, _v_set: Optional[int] = None) -> float:
_v_set = _v_set or self.read_single(R.V_SET)
self.v_set = _v_set / self.v_multi
return self.v_set

def set_v_set(self, v_set: float) -> float:
self.v_set = round(v_set * self.v_multi)
return self.write(R.V_SET, int(self.v_set))
return self.write_single(R.V_SET, int(self.v_set))

def get_i_set(self, _i_set: int = None) -> float:
_i_set = _i_set or self.read(R.I_SET)
def get_i_set(self, _i_set: Optional[int] = None) -> float:
_i_set = _i_set or self.read_single(R.I_SET)
self.i_set = _i_set / self.i_multi
return self.i_set

def set_i_set(self, i_set: float) -> float:
self.i_set = round(i_set * self.i_multi)
return self.write(R.I_SET, int(self.i_set))
return self.write_single(R.I_SET, int(self.i_set))

def get_v_out(self, _v_out: int = None) -> float:
_v_out = _v_out or self.read(R.V_OUT)
def get_v_out(self, _v_out: Optional[int] = None) -> float:
_v_out = _v_out or self.read_single(R.V_OUT)
self.v_out = _v_out / self.v_multi
return self.v_out

def get_i_out(self, _i_out: int = None) -> float:
_i_out = _i_out or self.read(R.I_OUT)
def get_i_out(self, _i_out: Optional[int] = None) -> float:
_i_out = _i_out or self.read_single(R.I_OUT)
self.i_out = _i_out / self.i_multi
return self.i_out

def get_p_out(self, _p_out: int = None) -> float:
_p_out = _p_out or self.read(R.P_OUT)
def get_p_out(self, _p_out: Optional[int] = None) -> float:
_p_out = _p_out or self.read_single(R.P_OUT)
self.p_out = _p_out / self.p_multi
return self.p_out

def get_v_in(self, _v_in: int = None) -> float:
_v_in = _v_in or self.read(R.V_IN)
def get_v_in(self, _v_in: Optional[int] = None) -> float:
_v_in = _v_in or self.read_single(R.V_IN)
self.v_in = _v_in / self.v_in_multi
return self.v_in

def is_keypad(self, _keypad: int = None) -> bool:
self.keypad = bool(_keypad or self.read(R.KEYPAD))
def is_keypad(self, _keypad: Optional[int] = None) -> bool:
self.keypad = bool(_keypad or self.read_single(R.KEYPAD))
return self.keypad

def get_ovp_ocp(self, _ovp_ocp: int = None) -> str:
_ovp_ocp = _ovp_ocp or self.read(R.OVP_OCP)
self.ovp_ocp = (
"OVP" if _ovp_ocp == 1 else "OCP" if _ovp_ocp == 2 else None
)
def get_ovp_ocp(self, _ovp_ocp: Optional[int] = None) -> Optional[str]:
return self.ovp_ocp

def get_cv_cc(self, _cv_cc: int = None) -> str:
_cv_cc = _cv_cc or self.read(R.CV_CC)
def get_cv_cc(self, _cv_cc: Optional[int] = None) -> Optional[str]:
_cv_cc = _cv_cc or self.read_single(R.CV_CC)
self.cv_cc = "CV" if _cv_cc == 0 else "CC" if _cv_cc == 1 else None
return self.cv_cc

def is_output(self, _output: int = None) -> bool:
self.output = bool(_output or self.read(R.OUTPUT))
def is_output(self, _output: Optional[int] = None) -> bool:
self.output = bool(_output or self.read_single(R.OUTPUT))
return self.output

def set_output(self, output: bool) -> None:
self.output = output
return self.write(R.OUTPUT, int(self.output))
self.write_single(R.OUTPUT, int(self.output))

def get_preset(self, _preset: int = None) -> int:
def get_preset(self, _preset: Optional[int] = None) -> int:
"Always returns 0 on my device, setter works as expected"
self.preset = _preset or self.read(R.PRESET)
self.preset = _preset or self.read_single(R.PRESET)
return self.preset

def set_preset(self, preset: int) -> int:
self.preset = preset
return self.write(R.PRESET, self.preset)
return self.write_single(R.PRESET, self.preset)

def is_bat_mode(self, _bat_mode: int = None) -> bool:
self.bat_mode = bool(_bat_mode or self.read(R.BAT_MODE))
def is_bat_mode(self, _bat_mode: Optional[int] = None) -> bool:
self.bat_mode = bool(_bat_mode or self.read_single(R.BAT_MODE))
return self.bat_mode

def get_v_bat(self, _v_bat: int = None) -> float:
_v_bat = _v_bat or self.read(R.V_BAT)
def get_v_bat(self, _v_bat: Optional[int] = None) -> float:
_v_bat = _v_bat or self.read_single(R.V_BAT)
self.v_bat = _v_bat / self.v_multi
return self.v_bat

def get_ext_c(self, _ext_c_s: int = None, _ext_c: int = None) -> int:
_ext_c_s = _ext_c_s or self.read(R.EXT_C_S)
_ext_c = _ext_c or self.read(R.EXT_C)
def get_ext_c(self,
_ext_c_s: Optional[int] = None,
_ext_c: Optional[int] = None) -> int:
_ext_c_s = _ext_c_s or self.read_single(R.EXT_C_S)
_ext_c = _ext_c or self.read_single(R.EXT_C)
sign = -1 if _ext_c_s else +1
self.ext_c = _ext_c * sign
return self.ext_c

def get_ext_f(self, _ext_f_s: int = None, _ext_f: int = None) -> int:
_ext_f_s = _ext_f_s or self.read(R.EXT_F_S)
_ext_f = _ext_f or self.read(R.EXT_F)
def get_ext_f(self,
_ext_f_s: Optional[int] = None,
_ext_f: Optional[int] = None) -> int:
_ext_f_s = _ext_f_s or self.read_single(R.EXT_F_S)
_ext_f = _ext_f or self.read_single(R.EXT_F)
sign = -1 if _ext_f_s else +1
self.ext_f = _ext_f * sign
return self.ext_f

def get_ah(self, _ah_h: int = None, _ah_l: int = None) -> float:
_ah_h = _ah_h or self.read(R.AH_H)
_ah_l = _ah_l or self.read(R.AH_L)
def get_ah(self,
_ah_h: Optional[int] = None,
_ah_l: Optional[int] = None) -> float:
_ah_h = _ah_h or self.read_single(R.AH_H)
_ah_l = _ah_l or self.read_single(R.AH_L)
self.ah = (_ah_h << 16 | _ah_l) / 1000
return self.ah

def get_wh(self, _wh_h: int = None, _wh_l: int = None) -> float:
_wh_h = _wh_h or self.read(R.WH_H)
_wh_l = _wh_l or self.read(R.WH_L)
def get_wh(self,
_wh_h: Optional[int] = None,
_wh_l: Optional[int] = None) -> float:
_wh_h = _wh_h or self.read_single(R.WH_H)
_wh_l = _wh_l or self.read_single(R.WH_L)
self.wh = (_wh_h << 16 | _wh_l) / 1000
return self.wh

Expand All @@ -274,69 +297,69 @@ def get_date_time(self) -> datetime:
self.datetime = datetime(d[0], d[1], d[2], d[3], d[4], d[5])
return self.datetime

def set_date_time(self, d: datetime) -> int:
def set_date_time(self, d: datetime) -> tuple:
return self.write_multiple(
R.YEAR, (d.year, d.month, d.day, d.hour, d.minute, d.second)
)

def is_take_ok(self, _take_ok: int = None) -> bool:
self.take_ok = bool(_take_ok or self.read(R.OPT_TAKE_OK))
def is_take_ok(self, _take_ok: Optional[int] = None) -> bool:
self.take_ok = bool(_take_ok or self.read_single(R.OPT_TAKE_OK))
return self.take_ok

def set_take_ok(self, take_ok: bool) -> bool:
def set_take_ok(self, take_ok: bool) -> int:
self.take_ok = take_ok
return self.write(R.OPT_TAKE_OK, self.take_ok)
return self.write_single(R.OPT_TAKE_OK, self.take_ok)

def is_take_out(self, _take_out: int = None) -> bool:
self.take_out = bool(_take_out or self.read(R.OPT_TAKE_OUT))
def is_take_out(self, _take_out: Optional[int] = None) -> bool:
self.take_out = bool(_take_out or self.read_single(R.OPT_TAKE_OUT))
return self.take_out

def set_take_out(self, take_out: bool) -> bool:
def set_take_out(self, take_out: bool) -> int:
self.take_out = take_out
return self.write(R.OPT_TAKE_OUT, self.take_out)
return self.write_single(R.OPT_TAKE_OUT, self.take_out)

def is_boot_pow(self, _boot_pow: int = None) -> bool:
self.boot_pow = bool(_boot_pow or self.read(R.OPT_BOOT_POW))
def is_boot_pow(self, _boot_pow: Optional[int] = None) -> bool:
self.boot_pow = bool(_boot_pow or self.read_single(R.OPT_BOOT_POW))
return self.boot_pow

def set_boot_pow(self, boot_pow: bool) -> bool:
def set_boot_pow(self, boot_pow: bool) -> int:
self.boot_pow = boot_pow
return self.write(R.OPT_BOOT_POW, self.boot_pow)
return self.write_single(R.OPT_BOOT_POW, self.boot_pow)

def is_buzz(self, _buzz: int = None) -> bool:
self.buzz = bool(_buzz or self.read(R.OPT_BUZZ))
def is_buzz(self, _buzz: Optional[int] = None) -> int:
self.buzz = bool(_buzz or self.read_single(R.OPT_BUZZ))
return self.buzz

def set_buzz(self, buzz: bool) -> bool:
def set_buzz(self, buzz: bool) -> int:
self.buzz = buzz
return self.write(R.OPT_BUZZ, self.buzz)
return self.write_single(R.OPT_BUZZ, self.buzz)

def is_logo(self, _logo: int = None) -> bool:
self.logo = bool(_logo or self.read(R.OPT_LOGO))
def is_logo(self, _logo: Optional[int] = None) -> bool:
self.logo = bool(_logo or self.read_single(R.OPT_LOGO))
return self.logo

def set_logo(self, logo: bool) -> bool:
def set_logo(self, logo: bool) -> int:
self.logo = logo
return self.write(R.OPT_LOGO, self.logo)
return self.write_single(R.OPT_LOGO, self.logo)

def get_lang(self) -> int:
self.lang = self.read(R.OPT_LANG)
self.lang = self.read_single(R.OPT_LANG)
return self.lang

def set_lang(self, lang: int) -> int:
self.lang = lang
return self.write(R.OPT_LANG, self.lang)
return self.write_single(R.OPT_LANG, self.lang)

def get_light(self) -> int:
self.light = self.read(R.OPT_LIGHT)
self.light = self.read_single(R.OPT_LIGHT)
return self.light

def set_light(self, light: int) -> int:
self.light = light
return self.write(R.OPT_LIGHT, self.light)
return self.write_single(R.OPT_LIGHT, self.light)

def reboot_bootloader(self) -> None:
try:
self.write(R.SYSTEM, R.BOOTLOADER)
self.write_single(R.SYSTEM, R.BOOTLOADER)
except ModbusInvalidResponseError:
pass

0 comments on commit 2f2ad75

Please sign in to comment.