Source code for usb_devices

from __future__ import annotations

__version__ = "0.5.1"

import asyncio

try:
    from fcntl import ioctl
except ImportError:
    ioctl = None  # type: ignore

from pathlib import Path

BLUETOOTH_DEVICE_PATH = Path("/sys/class/bluetooth")
USB_DEVICE_PATH = Path("/sys/bus/usb/devices")
USB_DEVFS_PATH = Path("/dev/bus/usb")
# _IO('U', 20) constant in the linux kernel.
USBDEVFS_RESET = ord("U") << (4 * 2) | 20


[docs] class NotAUSBDeviceError(ValueError): """Raised when a device is not a USB device."""
__all__ = [ "USBDevice", "BluetoothDevice", "NotAUSBDeviceError", ]
[docs] class BluetoothDevice: __slots__ = ("hci", "path", "device_path", "usb_device") def __init__(self, hci: int) -> None: """Initialize a BluetoothDevice object.""" self.hci = hci self.path = BLUETOOTH_DEVICE_PATH / f"hci{self.hci}" self.device_path = self.path / "device" self.usb_device: USBDevice | None = None
[docs] async def async_setup(self) -> None: """Set up a Bluetooth device.""" await asyncio.get_running_loop().run_in_executor(None, self.setup)
[docs] async def async_reset(self) -> bool: """Reset a Bluetooth device.""" return await asyncio.get_running_loop().run_in_executor(None, self.reset)
[docs] def reset(self) -> bool: """Reset a Bluetooth device.""" if self.usb_device is None: self.setup() assert self.usb_device is not None # nosec return self.usb_device.reset()
[docs] def setup(self) -> None: """Create a USBDevice object.""" path = self.device_path.readlink() self.usb_device = USBDevice(path.parts[-1]) self.usb_device.setup()
[docs] class USBDevice: __slots__ = ( "id_str", "bus_port_id", "bus_id", "port_id", "interface_id", "manufacturer", "product", "product_id", "vendor_id", "dev_num", "usb_devfs_path", "path", ) _files = { "manufacturer": "manufacturer", "product": "product", "product_id": "idProduct", "vendor_id": "idVendor", "dev_num": "devnum", } def __init__(self, id_str: str) -> None: """Initialize a USBDevice object.""" if ":" not in id_str or "-" not in id_str: raise NotAUSBDeviceError(f"{id_str} is not a USB device") self.id_str = id_str # 1-1.2.2:1.0 bus_port_id, interface_id = id_str.split(":") self.bus_port_id = bus_port_id bus_id, port_id = bus_port_id.split("-") self.bus_id = bus_id self.port_id = port_id self.interface_id = interface_id self.manufacturer: str | None = None self.product: str | None = None self.product_id: str | None = None self.vendor_id: str | None = None self.dev_num: str | None = None self.path = USB_DEVICE_PATH / bus_port_id self.usb_devfs_path: Path | None = None
[docs] async def async_setup(self) -> None: """Set up a USB device.""" await asyncio.get_running_loop().run_in_executor(None, self.setup)
[docs] async def async_reset(self) -> bool: """Reset the USB device.""" return await asyncio.get_running_loop().run_in_executor(None, self.reset)
[docs] def setup(self) -> None: """Read the USB device.""" for key, value in self._files.items(): try: setattr( self, key, self.path.joinpath(value) .read_text(encoding="utf-8", errors="replace") .strip(), ) except FileNotFoundError: if key not in ("manufacturer", "product"): raise self.product = self.product or self.product_id self.manufacturer = self.manufacturer or self.vendor_id assert self.dev_num is not None # nosec self.usb_devfs_path = ( USB_DEVFS_PATH / f"{int(self.bus_id):03}" # noqa / f"{int(self.dev_num):03}" # noqa )
[docs] def reset(self) -> bool: """Reset the USB device.""" if ioctl is None: return False # type: ignore if self.usb_devfs_path is None: self.setup() assert self.usb_devfs_path is not None # nosec with self.usb_devfs_path.open("w") as usb_dev: return ioctl(usb_dev, USBDEVFS_RESET, 0) > -1