Skip to content

Commit

Permalink
feat: add first version (#5)
Browse files Browse the repository at this point in the history
  • Loading branch information
bdraco authored Nov 27, 2022
1 parent aa8e63e commit e34423e
Show file tree
Hide file tree
Showing 3 changed files with 165 additions and 0 deletions.
30 changes: 30 additions & 0 deletions info.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
import asyncio
import logging

from usb_devices import BluetoothDevice

logging.basicConfig(level=logging.INFO)
logging.getLogger("usb_devices").setLevel(logging.DEBUG)


async def run() -> None:
loop = asyncio.get_running_loop()
for i in range(0, 9):
dev = BluetoothDevice(i)
try:
await loop.run_in_executor(None, dev.setup)
except FileNotFoundError:
print(f"hci{i} not found")
continue
assert dev.usb_device is not None # nosec
print(
f"hci{i} manufacturer: {dev.usb_device.manufacturer}, "
f"product: {dev.usb_device.product}, "
f"vendor_id: {dev.usb_device.vendor_id}, "
f"product_id: {dev.usb_device.product_id}, "
f"bus_id: {dev.usb_device.bus_id}, "
f"dev_num: {dev.usb_device.dev_num}"
)


asyncio.run(run())
18 changes: 18 additions & 0 deletions reset.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
import asyncio
import logging

from usb_devices import BluetoothDevice

logging.basicConfig(level=logging.INFO)
logging.getLogger("usb_devices").setLevel(logging.DEBUG)


async def run() -> None:
dev = BluetoothDevice(0)
if await dev.async_reset():
print("Reset of hci0 succeeded")
else:
print("Reset of hci0 failed")


asyncio.run(run())
117 changes: 117 additions & 0 deletions src/usb_devices/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1,118 @@
from __future__ import annotations

__version__ = "0.0.3"

import asyncio
from fcntl import ioctl
from pathlib import Path

BLUETOOTH_DEVICE_PATH = Path("/sys/class/bluetooth")
USB_DEVICE_PATH = Path("/sys/bus/usb/devices")
USB_DEVFS_PATH = Path("/dev/bus/usb")
# _IO('U', 20) constant in the linux kernel.
USBDEVFS_RESET = ord("U") << (4 * 2) | 20

__all__ = [
"USBDevice",
"BluetoothDevice",
]


class BluetoothDevice:

__slots__ = ("hci", "path", "device_path", "usb_device")

def __init__(self, hci: int) -> None:
"""Initialize a BluetoothDevice object."""
self.hci = hci
self.path = BLUETOOTH_DEVICE_PATH / f"hci{self.hci}"
self.device_path = self.path / "device"
self.usb_device: USBDevice | None = None

async def async_setup(self) -> None:
"""Set up a Bluetooth device."""
await asyncio.get_running_loop().run_in_executor(None, self.setup)

async def async_reset(self) -> bool:
"""Reset a Bluetooth device."""
return await asyncio.get_running_loop().run_in_executor(None, self.reset)

def reset(self) -> bool:
"""Reset a Bluetooth device."""
if self.usb_device is None:
self.setup()
assert self.usb_device is not None # nosec
return self.usb_device.reset()

def setup(self) -> None:
"""Create a USBDevice object."""
path = self.device_path.readlink()
self.usb_device = USBDevice(path.parts[-1])
self.usb_device.setup()


class USBDevice:

__slots__ = (
"id_str",
"bus_port_id",
"bus_id",
"port_id",
"interface_id",
"manufacturer",
"product",
"product_id",
"vendor_id",
"dev_num",
"usb_devfs_path",
"path",
)
_files = {
"manufacturer": "manufacturer",
"product": "product",
"product_id": "idProduct",
"vendor_id": "idVendor",
"dev_num": "devnum",
}

def __init__(self, id_str: str) -> None:
"""Initialize a USBDevice object."""
self.id_str = id_str # 1-1.2.2:1.0
bus_port_id, interface_id = id_str.split(":")
self.bus_port_id = bus_port_id
bus_id, port_id = bus_port_id.split("-")
self.bus_id = bus_id
self.port_id = port_id
self.interface_id = interface_id
self.manufacturer: str | None = None
self.product: str | None = None
self.product_id: str | None = None
self.vendor_id: str | None = None
self.dev_num: str | None = None
self.path = USB_DEVICE_PATH / bus_port_id
self.usb_devfs_path: Path | None = None

async def async_setup(self) -> None:
"""Set up a USB device."""
await asyncio.get_running_loop().run_in_executor(None, self.setup)

async def async_reset(self) -> bool:
"""Reset the USB device."""
return await asyncio.get_running_loop().run_in_executor(None, self.reset)

def setup(self) -> None:
"""Read the USB device."""
for key, value in self._files.items():
setattr(self, key, self.path.joinpath(value).read_text().strip())
assert self.dev_num is not None # nosec
self.usb_devfs_path = (
USB_DEVFS_PATH / f"{int(self.bus_id):03}" / f"{int(self.dev_num):03}"
)

def reset(self) -> bool:
"""Reset the USB device."""
if self.usb_devfs_path is None:
self.setup()
assert self.usb_devfs_path is not None # nosec
with self.usb_devfs_path.open("w") as usb_dev:
return ioctl(usb_dev, USBDEVFS_RESET, 0) > -1

0 comments on commit e34423e

Please sign in to comment.