422 lines
		
	
	
		
			13 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			422 lines
		
	
	
		
			13 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| #! /usr/bin/python3 -sP
 | |
| # SPDX-License-Identifier: GPL-2.0
 | |
| # -*- coding: utf-8 -*-
 | |
| #
 | |
| # Copyright (c) 2017 Benjamin Tissoires <benjamin.tissoires@gmail.com>
 | |
| # Copyright (c) 2017 Red Hat, Inc.
 | |
| #
 | |
| # This program is free software: you can redistribute it and/or modify
 | |
| # it under the terms of the GNU General Public License as published by
 | |
| # the Free Software Foundation; either version 2 of the License, or
 | |
| # (at your option) any later version.
 | |
| #
 | |
| # This program is distributed in the hope that it will be useful,
 | |
| # but WITHOUT ANY WARRANTY; without even the implied warranty of
 | |
| # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 | |
| # GNU General Public License for more details.
 | |
| #
 | |
| # You should have received a copy of the GNU General Public License
 | |
| # along with this program.  If not, see <http://www.gnu.org/licenses/>.
 | |
| 
 | |
| import fcntl
 | |
| import functools
 | |
| import libevdev
 | |
| import os
 | |
| 
 | |
| try:
 | |
|     import pyudev
 | |
| except ImportError:
 | |
|     raise ImportError("UHID is not supported due to missing pyudev dependency")
 | |
| 
 | |
| import logging
 | |
| 
 | |
| import hidtools.hid as hid
 | |
| from hidtools.uhid import UHIDDevice
 | |
| from hidtools.util import BusType
 | |
| 
 | |
| from pathlib import Path
 | |
| from typing import Any, ClassVar, Dict, List, Optional, Tuple, Type, Union
 | |
| 
 | |
| logger = logging.getLogger("hidtools.device.base_device")
 | |
| 
 | |
| 
 | |
| class SysfsFile(object):
 | |
|     def __init__(self, path):
 | |
|         self.path = path
 | |
| 
 | |
|     def __set_value(self, value):
 | |
|         with open(self.path, "w") as f:
 | |
|             return f.write(f"{value}\n")
 | |
| 
 | |
|     def __get_value(self):
 | |
|         with open(self.path) as f:
 | |
|             return f.read().strip()
 | |
| 
 | |
|     @property
 | |
|     def int_value(self) -> int:
 | |
|         return int(self.__get_value())
 | |
| 
 | |
|     @int_value.setter
 | |
|     def int_value(self, v: int) -> None:
 | |
|         self.__set_value(v)
 | |
| 
 | |
|     @property
 | |
|     def str_value(self) -> str:
 | |
|         return self.__get_value()
 | |
| 
 | |
|     @str_value.setter
 | |
|     def str_value(self, v: str) -> None:
 | |
|         self.__set_value(v)
 | |
| 
 | |
| 
 | |
| class LED(object):
 | |
|     def __init__(self, sys_path):
 | |
|         self.max_brightness = SysfsFile(sys_path / "max_brightness").int_value
 | |
|         self.__brightness = SysfsFile(sys_path / "brightness")
 | |
| 
 | |
|     @property
 | |
|     def brightness(self) -> int:
 | |
|         return self.__brightness.int_value
 | |
| 
 | |
|     @brightness.setter
 | |
|     def brightness(self, value: int) -> None:
 | |
|         self.__brightness.int_value = value
 | |
| 
 | |
| 
 | |
| class PowerSupply(object):
 | |
|     """Represents Linux power_supply_class sysfs nodes."""
 | |
| 
 | |
|     def __init__(self, sys_path):
 | |
|         self._capacity = SysfsFile(sys_path / "capacity")
 | |
|         self._status = SysfsFile(sys_path / "status")
 | |
|         self._type = SysfsFile(sys_path / "type")
 | |
| 
 | |
|     @property
 | |
|     def capacity(self) -> int:
 | |
|         return self._capacity.int_value
 | |
| 
 | |
|     @property
 | |
|     def status(self) -> str:
 | |
|         return self._status.str_value
 | |
| 
 | |
|     @property
 | |
|     def type(self) -> str:
 | |
|         return self._type.str_value
 | |
| 
 | |
| 
 | |
| class HIDIsReady(object):
 | |
|     """
 | |
|     Companion class that binds to a kernel mechanism
 | |
|     and that allows to know when a uhid device is ready or not.
 | |
| 
 | |
|     See :meth:`is_ready` for details.
 | |
|     """
 | |
| 
 | |
|     def __init__(self: "HIDIsReady", uhid: UHIDDevice) -> None:
 | |
|         self.uhid = uhid
 | |
| 
 | |
|     def is_ready(self: "HIDIsReady") -> bool:
 | |
|         """
 | |
|         Overwrite in subclasses: should return True or False whether
 | |
|         the attached uhid device is ready or not.
 | |
|         """
 | |
|         return False
 | |
| 
 | |
| 
 | |
| class UdevHIDIsReady(HIDIsReady):
 | |
|     _pyudev_context: ClassVar[Optional[pyudev.Context]] = None
 | |
|     _pyudev_monitor: ClassVar[Optional[pyudev.Monitor]] = None
 | |
|     _uhid_devices: ClassVar[Dict[int, Tuple[bool, int]]] = {}
 | |
| 
 | |
|     def __init__(self: "UdevHIDIsReady", uhid: UHIDDevice) -> None:
 | |
|         super().__init__(uhid)
 | |
|         self._init_pyudev()
 | |
| 
 | |
|     @classmethod
 | |
|     def _init_pyudev(cls: Type["UdevHIDIsReady"]) -> None:
 | |
|         if cls._pyudev_context is None:
 | |
|             cls._pyudev_context = pyudev.Context()
 | |
|             cls._pyudev_monitor = pyudev.Monitor.from_netlink(cls._pyudev_context)
 | |
|             cls._pyudev_monitor.filter_by("hid")
 | |
|             cls._pyudev_monitor.start()
 | |
| 
 | |
|             UHIDDevice._append_fd_to_poll(
 | |
|                 cls._pyudev_monitor.fileno(), cls._cls_udev_event_callback
 | |
|             )
 | |
| 
 | |
|     @classmethod
 | |
|     def _cls_udev_event_callback(cls: Type["UdevHIDIsReady"]) -> None:
 | |
|         if cls._pyudev_monitor is None:
 | |
|             return
 | |
|         event: pyudev.Device
 | |
|         for event in iter(functools.partial(cls._pyudev_monitor.poll, 0.02), None):
 | |
|             if event.action not in ["bind", "remove", "unbind"]:
 | |
|                 return
 | |
| 
 | |
|             logger.debug(f"udev event: {event.action} -> {event}")
 | |
| 
 | |
|             id = int(event.sys_path.strip().split(".")[-1], 16)
 | |
| 
 | |
|             device_ready, count = cls._uhid_devices.get(id, (False, 0))
 | |
| 
 | |
|             ready = event.action == "bind"
 | |
|             if not device_ready and ready:
 | |
|                 count += 1
 | |
|             cls._uhid_devices[id] = (ready, count)
 | |
| 
 | |
|     def is_ready(self: "UdevHIDIsReady") -> Tuple[bool, int]:
 | |
|         try:
 | |
|             return self._uhid_devices[self.uhid.hid_id]
 | |
|         except KeyError:
 | |
|             return (False, 0)
 | |
| 
 | |
| 
 | |
| class EvdevMatch(object):
 | |
|     def __init__(
 | |
|         self: "EvdevMatch",
 | |
|         *,
 | |
|         requires: List[Any] = [],
 | |
|         excludes: List[Any] = [],
 | |
|         req_properties: List[Any] = [],
 | |
|         excl_properties: List[Any] = [],
 | |
|     ) -> None:
 | |
|         self.requires = requires
 | |
|         self.excludes = excludes
 | |
|         self.req_properties = req_properties
 | |
|         self.excl_properties = excl_properties
 | |
| 
 | |
|     def is_a_match(self: "EvdevMatch", evdev: libevdev.Device) -> bool:
 | |
|         for m in self.requires:
 | |
|             if not evdev.has(m):
 | |
|                 return False
 | |
|         for m in self.excludes:
 | |
|             if evdev.has(m):
 | |
|                 return False
 | |
|         for p in self.req_properties:
 | |
|             if not evdev.has_property(p):
 | |
|                 return False
 | |
|         for p in self.excl_properties:
 | |
|             if evdev.has_property(p):
 | |
|                 return False
 | |
|         return True
 | |
| 
 | |
| 
 | |
| class EvdevDevice(object):
 | |
|     """
 | |
|     Represents an Evdev node and its properties.
 | |
|     This is a stub for the libevdev devices, as they are relying on
 | |
|     uevent to get the data, saving us some ioctls to fetch the names
 | |
|     and properties.
 | |
|     """
 | |
| 
 | |
|     def __init__(self: "EvdevDevice", sysfs: Path) -> None:
 | |
|         self.sysfs = sysfs
 | |
|         self.event_node: Any = None
 | |
|         self.libevdev: Optional[libevdev.Device] = None
 | |
| 
 | |
|         self.uevents = {}
 | |
|         # all of the interesting properties are stored in the input uevent, so in the parent
 | |
|         # so convert the uevent file of the parent input node into a dict
 | |
|         with open(sysfs.parent / "uevent") as f:
 | |
|             for line in f.readlines():
 | |
|                 key, value = line.strip().split("=")
 | |
|                 self.uevents[key] = value.strip('"')
 | |
| 
 | |
|         # we open all evdev nodes in order to not miss any event
 | |
|         self.open()
 | |
| 
 | |
|     @property
 | |
|     def name(self: "EvdevDevice") -> str:
 | |
|         assert "NAME" in self.uevents
 | |
| 
 | |
|         return self.uevents["NAME"]
 | |
| 
 | |
|     @property
 | |
|     def evdev(self: "EvdevDevice") -> Path:
 | |
|         return Path("/dev/input") / self.sysfs.name
 | |
| 
 | |
|     def matches_application(
 | |
|         self: "EvdevDevice", application: str, matches: Dict[str, EvdevMatch]
 | |
|     ) -> bool:
 | |
|         if self.libevdev is None:
 | |
|             return False
 | |
| 
 | |
|         if application in matches:
 | |
|             return matches[application].is_a_match(self.libevdev)
 | |
| 
 | |
|         logger.error(
 | |
|             f"application '{application}' is unknown, please update/fix hid-tools"
 | |
|         )
 | |
|         assert False  # hid-tools likely needs an update
 | |
| 
 | |
|     def open(self: "EvdevDevice") -> libevdev.Device:
 | |
|         self.event_node = open(self.evdev, "rb")
 | |
|         self.libevdev = libevdev.Device(self.event_node)
 | |
| 
 | |
|         assert self.libevdev.fd is not None
 | |
| 
 | |
|         fd = self.libevdev.fd.fileno()
 | |
|         flag = fcntl.fcntl(fd, fcntl.F_GETFD)
 | |
|         fcntl.fcntl(fd, fcntl.F_SETFL, flag | os.O_NONBLOCK)
 | |
| 
 | |
|         return self.libevdev
 | |
| 
 | |
|     def close(self: "EvdevDevice") -> None:
 | |
|         if self.libevdev is not None and self.libevdev.fd is not None:
 | |
|             self.libevdev.fd.close()
 | |
|             self.libevdev = None
 | |
|         if self.event_node is not None:
 | |
|             self.event_node.close()
 | |
|             self.event_node = None
 | |
| 
 | |
| 
 | |
| class BaseDevice(UHIDDevice):
 | |
|     # default _application_matches that matches nothing. This needs
 | |
|     # to be set in the subclasses to have get_evdev() working
 | |
|     _application_matches: Dict[str, EvdevMatch] = {}
 | |
| 
 | |
|     def __init__(
 | |
|         self,
 | |
|         name,
 | |
|         application,
 | |
|         rdesc_str: Optional[str] = None,
 | |
|         rdesc: Optional[Union[hid.ReportDescriptor, str, bytes]] = None,
 | |
|         input_info=None,
 | |
|     ) -> None:
 | |
|         self._kernel_is_ready: HIDIsReady = UdevHIDIsReady(self)
 | |
|         if rdesc_str is None and rdesc is None:
 | |
|             raise Exception("Please provide at least a rdesc or rdesc_str")
 | |
|         super().__init__()
 | |
|         if name is None:
 | |
|             name = f"uhid gamepad test {self.__class__.__name__}"
 | |
|         if input_info is None:
 | |
|             input_info = (BusType.USB, 1, 2)
 | |
|         self.name = name
 | |
|         self.info = input_info
 | |
|         self.default_reportID = None
 | |
|         self.opened = False
 | |
|         self.started = False
 | |
|         self.application = application
 | |
|         self._input_nodes: Optional[list[EvdevDevice]] = None
 | |
|         if rdesc is None:
 | |
|             assert rdesc_str is not None
 | |
|             self.rdesc = hid.ReportDescriptor.from_human_descr(rdesc_str)  # type: ignore
 | |
|         else:
 | |
|             self.rdesc = rdesc  # type: ignore
 | |
| 
 | |
|     @property
 | |
|     def power_supply_class(self: "BaseDevice") -> Optional[PowerSupply]:
 | |
|         ps = self.walk_sysfs("power_supply", "power_supply/*")
 | |
|         if ps is None or len(ps) < 1:
 | |
|             return None
 | |
| 
 | |
|         return PowerSupply(ps[0])
 | |
| 
 | |
|     @property
 | |
|     def led_classes(self: "BaseDevice") -> List[LED]:
 | |
|         leds = self.walk_sysfs("led", "**/max_brightness")
 | |
|         if leds is None:
 | |
|             return []
 | |
| 
 | |
|         return [LED(led.parent) for led in leds]
 | |
| 
 | |
|     @property
 | |
|     def kernel_is_ready(self: "BaseDevice") -> bool:
 | |
|         return self._kernel_is_ready.is_ready()[0] and self.started
 | |
| 
 | |
|     @property
 | |
|     def kernel_ready_count(self: "BaseDevice") -> int:
 | |
|         return self._kernel_is_ready.is_ready()[1]
 | |
| 
 | |
|     @property
 | |
|     def input_nodes(self: "BaseDevice") -> List[EvdevDevice]:
 | |
|         if self._input_nodes is not None:
 | |
|             return self._input_nodes
 | |
| 
 | |
|         if not self.kernel_is_ready or not self.started:
 | |
|             return []
 | |
| 
 | |
|         self._input_nodes = [
 | |
|             EvdevDevice(path)
 | |
|             for path in self.walk_sysfs("input", "input/input*/event*")
 | |
|         ]
 | |
|         return self._input_nodes
 | |
| 
 | |
|     def match_evdev_rule(self, application, evdev):
 | |
|         """Replace this in subclasses if the device has multiple reports
 | |
|         of the same type and we need to filter based on the actual evdev
 | |
|         node.
 | |
| 
 | |
|         returning True will append the corresponding report to
 | |
|         `self.input_nodes[type]`
 | |
|         returning False  will ignore this report / type combination
 | |
|         for the device.
 | |
|         """
 | |
|         return True
 | |
| 
 | |
|     def open(self):
 | |
|         self.opened = True
 | |
| 
 | |
|     def _close_all_opened_evdev(self):
 | |
|         if self._input_nodes is not None:
 | |
|             for e in self._input_nodes:
 | |
|                 e.close()
 | |
| 
 | |
|     def __del__(self):
 | |
|         self._close_all_opened_evdev()
 | |
| 
 | |
|     def close(self):
 | |
|         self.opened = False
 | |
| 
 | |
|     def start(self, flags):
 | |
|         self.started = True
 | |
| 
 | |
|     def stop(self):
 | |
|         self.started = False
 | |
|         self._close_all_opened_evdev()
 | |
| 
 | |
|     def next_sync_events(self, application=None):
 | |
|         evdev = self.get_evdev(application)
 | |
|         if evdev is not None:
 | |
|             return list(evdev.events())
 | |
|         return []
 | |
| 
 | |
|     @property
 | |
|     def application_matches(self: "BaseDevice") -> Dict[str, EvdevMatch]:
 | |
|         return self._application_matches
 | |
| 
 | |
|     @application_matches.setter
 | |
|     def application_matches(self: "BaseDevice", data: Dict[str, EvdevMatch]) -> None:
 | |
|         self._application_matches = data
 | |
| 
 | |
|     def get_evdev(self, application=None):
 | |
|         if application is None:
 | |
|             application = self.application
 | |
| 
 | |
|         if len(self.input_nodes) == 0:
 | |
|             return None
 | |
| 
 | |
|         assert self._input_nodes is not None
 | |
| 
 | |
|         if len(self._input_nodes) == 1:
 | |
|             evdev = self._input_nodes[0]
 | |
|             if self.match_evdev_rule(application, evdev.libevdev):
 | |
|                 return evdev.libevdev
 | |
|         else:
 | |
|             for _evdev in self._input_nodes:
 | |
|                 if _evdev.matches_application(application, self.application_matches):
 | |
|                     if self.match_evdev_rule(application, _evdev.libevdev):
 | |
|                         return _evdev.libevdev
 | |
| 
 | |
|     def is_ready(self):
 | |
|         """Returns whether a UHID device is ready. Can be overwritten in
 | |
|         subclasses to add extra conditions on when to consider a UHID
 | |
|         device ready. This can be:
 | |
| 
 | |
|         - we need to wait on different types of input devices to be ready
 | |
|           (Touch Screen and Pen for example)
 | |
|         - we need to have at least 4 LEDs present
 | |
|           (len(self.uhdev.leds_classes) == 4)
 | |
|         - or any other combinations"""
 | |
|         return self.kernel_is_ready
 |