diff mbox series

[10/18] selftests/hid: import base_device.py from hid-tools

Message ID 20240410-bpf_sources-v1-10-a8bf16033ef8@kernel.org (mailing list archive)
State Mainlined
Commit a7def2e51c667578140d9aa3282533463ed3df91
Headers show
Series HID: Include current HID-BPF fixes in tree | expand

Commit Message

Benjamin Tissoires April 10, 2024, 5:19 p.m. UTC
We need to slightly change base_device.py for supporting HID-BPF,
so instead of monkey patching, let's just embed it in the kernel tree.

Signed-off-by: Benjamin Tissoires <bentiss@kernel.org>
---
 tools/testing/selftests/hid/tests/base.py        |   2 +-
 tools/testing/selftests/hid/tests/base_device.py | 412 +++++++++++++++++++++++
 2 files changed, 413 insertions(+), 1 deletion(-)
diff mbox series

Patch

diff --git a/tools/testing/selftests/hid/tests/base.py b/tools/testing/selftests/hid/tests/base.py
index 51433063b227..6bb5b887baaf 100644
--- a/tools/testing/selftests/hid/tests/base.py
+++ b/tools/testing/selftests/hid/tests/base.py
@@ -12,7 +12,7 @@  import time
 
 import logging
 
-from hidtools.device.base_device import BaseDevice, EvdevMatch, SysfsFile
+from .base_device import BaseDevice, EvdevMatch, SysfsFile
 from pathlib import Path
 from typing import Final, List, Tuple
 
diff --git a/tools/testing/selftests/hid/tests/base_device.py b/tools/testing/selftests/hid/tests/base_device.py
new file mode 100644
index 000000000000..092c7c4e62ef
--- /dev/null
+++ b/tools/testing/selftests/hid/tests/base_device.py
@@ -0,0 +1,412 @@ 
+#!/bin/env python3
+# SPDX-License-Identifier: GPL-2.0
+# -*- coding: utf-8 -*-
+#
+# Copyright (c) 2017 Benjamin Tissoires <benjamin.tissoires@gmail.com>
+# Copyright (c) 2017 Red Hat, Inc.
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program.  If not, see <http://www.gnu.org/licenses/>.
+
+import fcntl
+import functools
+import libevdev
+import os
+
+try:
+    import pyudev
+except ImportError:
+    raise ImportError("UHID is not supported due to missing pyudev dependency")
+
+import logging
+
+import hidtools.hid as hid
+from hidtools.uhid import UHIDDevice
+from hidtools.util import BusType
+
+from pathlib import Path
+from typing import Any, ClassVar, Dict, List, Optional, Type, Union
+
+logger = logging.getLogger("hidtools.device.base_device")
+
+
+class SysfsFile(object):
+    def __init__(self, path):
+        self.path = path
+
+    def __set_value(self, value):
+        with open(self.path, "w") as f:
+            return f.write(f"{value}\n")
+
+    def __get_value(self):
+        with open(self.path) as f:
+            return f.read().strip()
+
+    @property
+    def int_value(self) -> int:
+        return int(self.__get_value())
+
+    @int_value.setter
+    def int_value(self, v: int) -> None:
+        self.__set_value(v)
+
+    @property
+    def str_value(self) -> str:
+        return self.__get_value()
+
+    @str_value.setter
+    def str_value(self, v: str) -> None:
+        self.__set_value(v)
+
+
+class LED(object):
+    def __init__(self, sys_path):
+        self.max_brightness = SysfsFile(sys_path / "max_brightness").int_value
+        self.__brightness = SysfsFile(sys_path / "brightness")
+
+    @property
+    def brightness(self) -> int:
+        return self.__brightness.int_value
+
+    @brightness.setter
+    def brightness(self, value: int) -> None:
+        self.__brightness.int_value = value
+
+
+class PowerSupply(object):
+    """Represents Linux power_supply_class sysfs nodes."""
+
+    def __init__(self, sys_path):
+        self._capacity = SysfsFile(sys_path / "capacity")
+        self._status = SysfsFile(sys_path / "status")
+        self._type = SysfsFile(sys_path / "type")
+
+    @property
+    def capacity(self) -> int:
+        return self._capacity.int_value
+
+    @property
+    def status(self) -> str:
+        return self._status.str_value
+
+    @property
+    def type(self) -> str:
+        return self._type.str_value
+
+
+class HIDIsReady(object):
+    """
+    Companion class that binds to a kernel mechanism
+    and that allows to know when a uhid device is ready or not.
+
+    See :meth:`is_ready` for details.
+    """
+
+    def __init__(self: "HIDIsReady", uhid: UHIDDevice) -> None:
+        self.uhid = uhid
+
+    def is_ready(self: "HIDIsReady") -> bool:
+        """
+        Overwrite in subclasses: should return True or False whether
+        the attached uhid device is ready or not.
+        """
+        return False
+
+
+class UdevHIDIsReady(HIDIsReady):
+    _pyudev_context: ClassVar[Optional[pyudev.Context]] = None
+    _pyudev_monitor: ClassVar[Optional[pyudev.Monitor]] = None
+    _uhid_devices: ClassVar[Dict[int, bool]] = {}
+
+    def __init__(self: "UdevHIDIsReady", uhid: UHIDDevice) -> None:
+        super().__init__(uhid)
+        self._init_pyudev()
+
+    @classmethod
+    def _init_pyudev(cls: Type["UdevHIDIsReady"]) -> None:
+        if cls._pyudev_context is None:
+            cls._pyudev_context = pyudev.Context()
+            cls._pyudev_monitor = pyudev.Monitor.from_netlink(cls._pyudev_context)
+            cls._pyudev_monitor.filter_by("hid")
+            cls._pyudev_monitor.start()
+
+            UHIDDevice._append_fd_to_poll(
+                cls._pyudev_monitor.fileno(), cls._cls_udev_event_callback
+            )
+
+    @classmethod
+    def _cls_udev_event_callback(cls: Type["UdevHIDIsReady"]) -> None:
+        if cls._pyudev_monitor is None:
+            return
+        event: pyudev.Device
+        for event in iter(functools.partial(cls._pyudev_monitor.poll, 0.02), None):
+            if event.action not in ["bind", "remove"]:
+                return
+
+            logger.debug(f"udev event: {event.action} -> {event}")
+
+            id = int(event.sys_path.strip().split(".")[-1], 16)
+
+            cls._uhid_devices[id] = event.action == "bind"
+
+    def is_ready(self: "UdevHIDIsReady") -> bool:
+        try:
+            return self._uhid_devices[self.uhid.hid_id]
+        except KeyError:
+            return False
+
+
+class EvdevMatch(object):
+    def __init__(
+        self: "EvdevMatch",
+        *,
+        requires: List[Any] = [],
+        excludes: List[Any] = [],
+        req_properties: List[Any] = [],
+        excl_properties: List[Any] = [],
+    ) -> None:
+        self.requires = requires
+        self.excludes = excludes
+        self.req_properties = req_properties
+        self.excl_properties = excl_properties
+
+    def is_a_match(self: "EvdevMatch", evdev: libevdev.Device) -> bool:
+        for m in self.requires:
+            if not evdev.has(m):
+                return False
+        for m in self.excludes:
+            if evdev.has(m):
+                return False
+        for p in self.req_properties:
+            if not evdev.has_property(p):
+                return False
+        for p in self.excl_properties:
+            if evdev.has_property(p):
+                return False
+        return True
+
+
+class EvdevDevice(object):
+    """
+    Represents an Evdev node and its properties.
+    This is a stub for the libevdev devices, as they are relying on
+    uevent to get the data, saving us some ioctls to fetch the names
+    and properties.
+    """
+
+    def __init__(self: "EvdevDevice", sysfs: Path) -> None:
+        self.sysfs = sysfs
+        self.event_node: Any = None
+        self.libevdev: Optional[libevdev.Device] = None
+
+        self.uevents = {}
+        # all of the interesting properties are stored in the input uevent, so in the parent
+        # so convert the uevent file of the parent input node into a dict
+        with open(sysfs.parent / "uevent") as f:
+            for line in f.readlines():
+                key, value = line.strip().split("=")
+                self.uevents[key] = value.strip('"')
+
+        # we open all evdev nodes in order to not miss any event
+        self.open()
+
+    @property
+    def name(self: "EvdevDevice") -> str:
+        assert "NAME" in self.uevents
+
+        return self.uevents["NAME"]
+
+    @property
+    def evdev(self: "EvdevDevice") -> Path:
+        return Path("/dev/input") / self.sysfs.name
+
+    def matches_application(
+        self: "EvdevDevice", application: str, matches: Dict[str, EvdevMatch]
+    ) -> bool:
+        if self.libevdev is None:
+            return False
+
+        if application in matches:
+            return matches[application].is_a_match(self.libevdev)
+
+        logger.error(
+            f"application '{application}' is unknown, please update/fix hid-tools"
+        )
+        assert False  # hid-tools likely needs an update
+
+    def open(self: "EvdevDevice") -> libevdev.Device:
+        self.event_node = open(self.evdev, "rb")
+        self.libevdev = libevdev.Device(self.event_node)
+
+        assert self.libevdev.fd is not None
+
+        fd = self.libevdev.fd.fileno()
+        flag = fcntl.fcntl(fd, fcntl.F_GETFD)
+        fcntl.fcntl(fd, fcntl.F_SETFL, flag | os.O_NONBLOCK)
+
+        return self.libevdev
+
+    def close(self: "EvdevDevice") -> None:
+        if self.libevdev is not None and self.libevdev.fd is not None:
+            self.libevdev.fd.close()
+            self.libevdev = None
+        if self.event_node is not None:
+            self.event_node.close()
+            self.event_node = None
+
+
+class BaseDevice(UHIDDevice):
+    # default _application_matches that matches nothing. This needs
+    # to be set in the subclasses to have get_evdev() working
+    _application_matches: Dict[str, EvdevMatch] = {}
+
+    def __init__(
+        self,
+        name,
+        application,
+        rdesc_str: Optional[str] = None,
+        rdesc: Optional[Union[hid.ReportDescriptor, str, bytes]] = None,
+        input_info=None,
+    ) -> None:
+        self._kernel_is_ready: HIDIsReady = UdevHIDIsReady(self)
+        if rdesc_str is None and rdesc is None:
+            raise Exception("Please provide at least a rdesc or rdesc_str")
+        super().__init__()
+        if name is None:
+            name = f"uhid gamepad test {self.__class__.__name__}"
+        if input_info is None:
+            input_info = (BusType.USB, 1, 2)
+        self.name = name
+        self.info = input_info
+        self.default_reportID = None
+        self.opened = False
+        self.started = False
+        self.application = application
+        self._input_nodes: Optional[list[EvdevDevice]] = None
+        if rdesc is None:
+            assert rdesc_str is not None
+            self.rdesc = hid.ReportDescriptor.from_human_descr(rdesc_str)  # type: ignore
+        else:
+            self.rdesc = rdesc  # type: ignore
+
+    @property
+    def power_supply_class(self: "BaseDevice") -> Optional[PowerSupply]:
+        ps = self.walk_sysfs("power_supply", "power_supply/*")
+        if ps is None or len(ps) < 1:
+            return None
+
+        return PowerSupply(ps[0])
+
+    @property
+    def led_classes(self: "BaseDevice") -> List[LED]:
+        leds = self.walk_sysfs("led", "**/max_brightness")
+        if leds is None:
+            return []
+
+        return [LED(led.parent) for led in leds]
+
+    @property
+    def kernel_is_ready(self: "BaseDevice") -> bool:
+        return self._kernel_is_ready.is_ready() and self.started
+
+    @property
+    def input_nodes(self: "BaseDevice") -> List[EvdevDevice]:
+        if self._input_nodes is not None:
+            return self._input_nodes
+
+        if not self.kernel_is_ready or not self.started:
+            return []
+
+        self._input_nodes = [
+            EvdevDevice(path)
+            for path in self.walk_sysfs("input", "input/input*/event*")
+        ]
+        return self._input_nodes
+
+    def match_evdev_rule(self, application, evdev):
+        """Replace this in subclasses if the device has multiple reports
+        of the same type and we need to filter based on the actual evdev
+        node.
+
+        returning True will append the corresponding report to
+        `self.input_nodes[type]`
+        returning False  will ignore this report / type combination
+        for the device.
+        """
+        return True
+
+    def open(self):
+        self.opened = True
+
+    def _close_all_opened_evdev(self):
+        if self._input_nodes is not None:
+            for e in self._input_nodes:
+                e.close()
+
+    def __del__(self):
+        self._close_all_opened_evdev()
+
+    def close(self):
+        self.opened = False
+
+    def start(self, flags):
+        self.started = True
+
+    def stop(self):
+        self.started = False
+        self._close_all_opened_evdev()
+
+    def next_sync_events(self, application=None):
+        evdev = self.get_evdev(application)
+        if evdev is not None:
+            return list(evdev.events())
+        return []
+
+    @property
+    def application_matches(self: "BaseDevice") -> Dict[str, EvdevMatch]:
+        return self._application_matches
+
+    @application_matches.setter
+    def application_matches(self: "BaseDevice", data: Dict[str, EvdevMatch]) -> None:
+        self._application_matches = data
+
+    def get_evdev(self, application=None):
+        if application is None:
+            application = self.application
+
+        if len(self.input_nodes) == 0:
+            return None
+
+        assert self._input_nodes is not None
+
+        if len(self._input_nodes) == 1:
+            evdev = self._input_nodes[0]
+            if self.match_evdev_rule(application, evdev.libevdev):
+                return evdev.libevdev
+        else:
+            for _evdev in self._input_nodes:
+                if _evdev.matches_application(application, self.application_matches):
+                    if self.match_evdev_rule(application, _evdev.libevdev):
+                        return _evdev.libevdev
+
+    def is_ready(self):
+        """Returns whether a UHID device is ready. Can be overwritten in
+        subclasses to add extra conditions on when to consider a UHID
+        device ready. This can be:
+
+        - we need to wait on different types of input devices to be ready
+          (Touch Screen and Pen for example)
+        - we need to have at least 4 LEDs present
+          (len(self.uhdev.leds_classes) == 4)
+        - or any other combinations"""
+        return self.kernel_is_ready