diff --git a/info.py b/info.py new file mode 100644 index 0000000..b726170 --- /dev/null +++ b/info.py @@ -0,0 +1,30 @@ +import asyncio +import logging + +from usb_devices import BluetoothDevice + +logging.basicConfig(level=logging.INFO) +logging.getLogger("usb_devices").setLevel(logging.DEBUG) + + +async def run() -> None: + loop = asyncio.get_running_loop() + for i in range(0, 9): + dev = BluetoothDevice(i) + try: + await loop.run_in_executor(None, dev.setup) + except FileNotFoundError: + print(f"hci{i} not found") + continue + assert dev.usb_device is not None # nosec + print( + f"hci{i} manufacturer: {dev.usb_device.manufacturer}, " + f"product: {dev.usb_device.product}, " + f"vendor_id: {dev.usb_device.vendor_id}, " + f"product_id: {dev.usb_device.product_id}, " + f"bus_id: {dev.usb_device.bus_id}, " + f"dev_num: {dev.usb_device.dev_num}" + ) + + +asyncio.run(run()) diff --git a/reset.py b/reset.py new file mode 100644 index 0000000..6ca5b31 --- /dev/null +++ b/reset.py @@ -0,0 +1,18 @@ +import asyncio +import logging + +from usb_devices import BluetoothDevice + +logging.basicConfig(level=logging.INFO) +logging.getLogger("usb_devices").setLevel(logging.DEBUG) + + +async def run() -> None: + dev = BluetoothDevice(0) + if await dev.async_reset(): + print("Reset of hci0 succeeded") + else: + print("Reset of hci0 failed") + + +asyncio.run(run()) diff --git a/src/usb_devices/__init__.py b/src/usb_devices/__init__.py index 27fdca4..2085cee 100644 --- a/src/usb_devices/__init__.py +++ b/src/usb_devices/__init__.py @@ -1 +1,118 @@ +from __future__ import annotations + __version__ = "0.0.3" + +import asyncio +from fcntl import ioctl +from pathlib import Path + +BLUETOOTH_DEVICE_PATH = Path("/sys/class/bluetooth") +USB_DEVICE_PATH = Path("/sys/bus/usb/devices") +USB_DEVFS_PATH = Path("/dev/bus/usb") +# _IO('U', 20) constant in the linux kernel. +USBDEVFS_RESET = ord("U") << (4 * 2) | 20 + +__all__ = [ + "USBDevice", + "BluetoothDevice", +] + + +class BluetoothDevice: + + __slots__ = ("hci", "path", "device_path", "usb_device") + + def __init__(self, hci: int) -> None: + """Initialize a BluetoothDevice object.""" + self.hci = hci + self.path = BLUETOOTH_DEVICE_PATH / f"hci{self.hci}" + self.device_path = self.path / "device" + self.usb_device: USBDevice | None = None + + async def async_setup(self) -> None: + """Set up a Bluetooth device.""" + await asyncio.get_running_loop().run_in_executor(None, self.setup) + + async def async_reset(self) -> bool: + """Reset a Bluetooth device.""" + return await asyncio.get_running_loop().run_in_executor(None, self.reset) + + def reset(self) -> bool: + """Reset a Bluetooth device.""" + if self.usb_device is None: + self.setup() + assert self.usb_device is not None # nosec + return self.usb_device.reset() + + def setup(self) -> None: + """Create a USBDevice object.""" + path = self.device_path.readlink() + self.usb_device = USBDevice(path.parts[-1]) + self.usb_device.setup() + + +class USBDevice: + + __slots__ = ( + "id_str", + "bus_port_id", + "bus_id", + "port_id", + "interface_id", + "manufacturer", + "product", + "product_id", + "vendor_id", + "dev_num", + "usb_devfs_path", + "path", + ) + _files = { + "manufacturer": "manufacturer", + "product": "product", + "product_id": "idProduct", + "vendor_id": "idVendor", + "dev_num": "devnum", + } + + def __init__(self, id_str: str) -> None: + """Initialize a USBDevice object.""" + self.id_str = id_str # 1-1.2.2:1.0 + bus_port_id, interface_id = id_str.split(":") + self.bus_port_id = bus_port_id + bus_id, port_id = bus_port_id.split("-") + self.bus_id = bus_id + self.port_id = port_id + self.interface_id = interface_id + self.manufacturer: str | None = None + self.product: str | None = None + self.product_id: str | None = None + self.vendor_id: str | None = None + self.dev_num: str | None = None + self.path = USB_DEVICE_PATH / bus_port_id + self.usb_devfs_path: Path | None = None + + async def async_setup(self) -> None: + """Set up a USB device.""" + await asyncio.get_running_loop().run_in_executor(None, self.setup) + + async def async_reset(self) -> bool: + """Reset the USB device.""" + return await asyncio.get_running_loop().run_in_executor(None, self.reset) + + def setup(self) -> None: + """Read the USB device.""" + for key, value in self._files.items(): + setattr(self, key, self.path.joinpath(value).read_text().strip()) + assert self.dev_num is not None # nosec + self.usb_devfs_path = ( + USB_DEVFS_PATH / f"{int(self.bus_id):03}" / f"{int(self.dev_num):03}" + ) + + def reset(self) -> bool: + """Reset the USB device.""" + if self.usb_devfs_path is None: + self.setup() + assert self.usb_devfs_path is not None # nosec + with self.usb_devfs_path.open("w") as usb_dev: + return ioctl(usb_dev, USBDEVFS_RESET, 0) > -1