diff mbox series

[RFC,v1,03/10] roadtest: add framework

Message ID 20220311162445.346685-4-vincent.whitchurch@axis.com (mailing list archive)
State New
Headers show
Series roadtest: a driver testing framework | expand

Commit Message

Vincent Whitchurch March 11, 2022, 4:24 p.m. UTC
Add the bulk of the roadtest framework.  Apart from one init shell
script, this is written in Python and includes three closely-related
parts:

 - The test runner which is invoked from the command line by the user
   and which starts the backend and sends the test jobs and results
   to/from UML.

 - Test support code which is used by the actual driver tests run inside
   UML and which interact with the backend via a file-based asynchronous
   communication method.

 - The backend which is run by the Python interpreter embedded in the C
   backend.  This part runs the hardware models and is controlled by the
   tests and the driver (via virtio in the C backend).

Some unit tests for the framework itself are included and these will be
automatically run whenever the driver tests are run.

Signed-off-by: Vincent Whitchurch <vincent.whitchurch@axis.com>
---
 tools/testing/roadtest/init.sh                |  19 ++
 tools/testing/roadtest/roadtest/__init__.py   |   2 +
 .../roadtest/roadtest/backend/__init__.py     |   0
 .../roadtest/roadtest/backend/backend.py      |  32 ++
 .../testing/roadtest/roadtest/backend/gpio.py | 111 +++++++
 .../testing/roadtest/roadtest/backend/i2c.py  | 123 ++++++++
 .../testing/roadtest/roadtest/backend/main.py |  13 +
 .../testing/roadtest/roadtest/backend/mock.py |  20 ++
 .../roadtest/roadtest/backend/test_gpio.py    |  98 ++++++
 .../roadtest/roadtest/backend/test_i2c.py     |  84 +++++
 .../testing/roadtest/roadtest/cmd/__init__.py |   0
 tools/testing/roadtest/roadtest/cmd/main.py   | 146 +++++++++
 tools/testing/roadtest/roadtest/cmd/remote.py |  48 +++
 .../roadtest/roadtest/core/__init__.py        |   0
 .../testing/roadtest/roadtest/core/control.py |  52 ++++
 .../roadtest/roadtest/core/devicetree.py      | 155 ++++++++++
 .../roadtest/roadtest/core/hardware.py        |  94 ++++++
 tools/testing/roadtest/roadtest/core/log.py   |  42 +++
 .../testing/roadtest/roadtest/core/modules.py |  38 +++
 .../testing/roadtest/roadtest/core/opslog.py  |  35 +++
 tools/testing/roadtest/roadtest/core/proxy.py |  48 +++
 tools/testing/roadtest/roadtest/core/suite.py | 286 ++++++++++++++++++
 tools/testing/roadtest/roadtest/core/sysfs.py |  77 +++++
 .../roadtest/roadtest/core/test_control.py    |  35 +++
 .../roadtest/roadtest/core/test_devicetree.py |  31 ++
 .../roadtest/roadtest/core/test_hardware.py   |  41 +++
 .../roadtest/roadtest/core/test_log.py        |  54 ++++
 .../roadtest/roadtest/core/test_opslog.py     |  27 ++
 .../roadtest/roadtest/tests/__init__.py       |   0
 29 files changed, 1711 insertions(+)
 create mode 100755 tools/testing/roadtest/init.sh
 create mode 100644 tools/testing/roadtest/roadtest/__init__.py
 create mode 100644 tools/testing/roadtest/roadtest/backend/__init__.py
 create mode 100644 tools/testing/roadtest/roadtest/backend/backend.py
 create mode 100644 tools/testing/roadtest/roadtest/backend/gpio.py
 create mode 100644 tools/testing/roadtest/roadtest/backend/i2c.py
 create mode 100644 tools/testing/roadtest/roadtest/backend/main.py
 create mode 100644 tools/testing/roadtest/roadtest/backend/mock.py
 create mode 100644 tools/testing/roadtest/roadtest/backend/test_gpio.py
 create mode 100644 tools/testing/roadtest/roadtest/backend/test_i2c.py
 create mode 100644 tools/testing/roadtest/roadtest/cmd/__init__.py
 create mode 100644 tools/testing/roadtest/roadtest/cmd/main.py
 create mode 100644 tools/testing/roadtest/roadtest/cmd/remote.py
 create mode 100644 tools/testing/roadtest/roadtest/core/__init__.py
 create mode 100644 tools/testing/roadtest/roadtest/core/control.py
 create mode 100644 tools/testing/roadtest/roadtest/core/devicetree.py
 create mode 100644 tools/testing/roadtest/roadtest/core/hardware.py
 create mode 100644 tools/testing/roadtest/roadtest/core/log.py
 create mode 100644 tools/testing/roadtest/roadtest/core/modules.py
 create mode 100644 tools/testing/roadtest/roadtest/core/opslog.py
 create mode 100644 tools/testing/roadtest/roadtest/core/proxy.py
 create mode 100644 tools/testing/roadtest/roadtest/core/suite.py
 create mode 100644 tools/testing/roadtest/roadtest/core/sysfs.py
 create mode 100644 tools/testing/roadtest/roadtest/core/test_control.py
 create mode 100644 tools/testing/roadtest/roadtest/core/test_devicetree.py
 create mode 100644 tools/testing/roadtest/roadtest/core/test_hardware.py
 create mode 100644 tools/testing/roadtest/roadtest/core/test_log.py
 create mode 100644 tools/testing/roadtest/roadtest/core/test_opslog.py
 create mode 100644 tools/testing/roadtest/roadtest/tests/__init__.py
diff mbox series

Patch

diff --git a/tools/testing/roadtest/init.sh b/tools/testing/roadtest/init.sh
new file mode 100755
index 000000000000..c5fb28478aa3
--- /dev/null
+++ b/tools/testing/roadtest/init.sh
@@ -0,0 +1,19 @@ 
+#!/bin/sh
+# SPDX-License-Identifier: GPL-2.0-only
+
+mount -t proc proc /proc
+echo 8 > /proc/sys/kernel/printk
+mount -t sysfs nodev /sys
+mount -t debugfs nodev /sys/kernel/debug
+
+echo 0 > /sys/bus/i2c/drivers_autoprobe
+echo 0 > /sys/bus/platform/drivers_autoprobe
+
+python3 -m roadtest.cmd.remote
+status=$?
+[ "${ROADTEST_SHELL}" = "1" ] || {
+    # rsync doesn't handle these zero-sized files correctly.
+    cp -ra --no-preserve=ownership /sys/kernel/debug/gcov ${ROADTEST_WORK_DIR}/gcov
+    echo o > /proc/sysrq-trigger
+}
+exec setsid sh -c 'exec bash </dev/tty0 >/dev/tty0 2>&1'
diff --git a/tools/testing/roadtest/roadtest/__init__.py b/tools/testing/roadtest/roadtest/__init__.py
new file mode 100644
index 000000000000..dac3ce6976e5
--- /dev/null
+++ b/tools/testing/roadtest/roadtest/__init__.py
@@ -0,0 +1,2 @@ 
+ENV_WORK_DIR = "ROADTEST_WORK_DIR"
+ENV_BUILD_DIR = "ROADTEST_BUILD_DIR"
diff --git a/tools/testing/roadtest/roadtest/backend/__init__.py b/tools/testing/roadtest/roadtest/backend/__init__.py
new file mode 100644
index 000000000000..e69de29bb2d1
diff --git a/tools/testing/roadtest/roadtest/backend/backend.py b/tools/testing/roadtest/roadtest/backend/backend.py
new file mode 100644
index 000000000000..bfd19fc363c2
--- /dev/null
+++ b/tools/testing/roadtest/roadtest/backend/backend.py
@@ -0,0 +1,32 @@ 
+# SPDX-License-Identifier: GPL-2.0-only
+# Copyright Axis Communications AB
+
+import logging
+import os
+from pathlib import Path
+
+from roadtest import ENV_WORK_DIR
+from roadtest.core.control import ControlReader
+
+from . import gpio, i2c, mock
+
+logger = logging.getLogger(__name__)
+
+try:
+    import cbackend  # type: ignore[import]
+except ModuleNotFoundError:
+    # In unit tests
+    cbackend = None
+
+
+class Backend:
+    def __init__(self) -> None:
+        work = Path(os.environ[ENV_WORK_DIR])
+        self.control = ControlReader(work_dir=work)
+        self.c = cbackend
+        self.i2c = i2c.I2CBackend(self)
+        self.gpio = gpio.GpioBackend(self)
+        self.mock = mock.MockBackend(work)
+
+    def process_control(self) -> None:
+        self.control.process({"backend": self})
diff --git a/tools/testing/roadtest/roadtest/backend/gpio.py b/tools/testing/roadtest/roadtest/backend/gpio.py
new file mode 100644
index 000000000000..2eaf52b31c72
--- /dev/null
+++ b/tools/testing/roadtest/roadtest/backend/gpio.py
@@ -0,0 +1,111 @@ 
+# SPDX-License-Identifier: GPL-2.0-only
+# Copyright Axis Communications AB
+
+import logging
+import typing
+from typing import Optional
+
+if typing.TYPE_CHECKING:
+    # Avoid circular imports
+    from .backend import Backend
+
+logger = logging.getLogger(__name__)
+
+
+class Gpio:
+    IRQ_TYPE_NONE = 0x00
+    IRQ_TYPE_EDGE_RISING = 0x01
+    IRQ_TYPE_EDGE_FALLING = 0x02
+    IRQ_TYPE_EDGE_BOTH = 0x03
+    IRQ_TYPE_LEVEL_HIGH = 0x04
+    IRQ_TYPE_LEVEL_LOW = 0x08
+
+    def __init__(self, backend: "Backend", pin: int):
+        self.backend = backend
+        self.pin = pin
+        self.state = False
+        self.irq_type = Gpio.IRQ_TYPE_NONE
+        self.masked = True
+        self.edge_irq_latched = False
+
+    def _level_irq_active(self) -> bool:
+        if self.irq_type == Gpio.IRQ_TYPE_LEVEL_HIGH:
+            return self.state
+        elif self.irq_type == Gpio.IRQ_TYPE_LEVEL_LOW:
+            return not self.state
+
+        return False
+
+    def _latch_edge_irq(self, old: bool, new: bool) -> bool:
+        if old != new:
+            logger.debug(f"{self}: latch_edge_irq {self.irq_type} {old} -> {new}")
+
+        if self.irq_type == Gpio.IRQ_TYPE_EDGE_RISING:
+            return not old and new
+        elif self.irq_type == Gpio.IRQ_TYPE_EDGE_FALLING:
+            return old and not new
+        elif self.irq_type == Gpio.IRQ_TYPE_EDGE_BOTH:
+            return old != new
+
+        return False
+
+    def _check_irq(self) -> None:
+        if self.irq_type == Gpio.IRQ_TYPE_NONE or self.masked:
+            return
+        if not self.edge_irq_latched and not self._level_irq_active():
+            return
+
+        self.masked = True
+        self.edge_irq_latched = False
+
+        logger.debug(f"{self}: trigger irq")
+        self.backend.c.trigger_gpio_irq(self.pin)
+
+    def set_irq_type(self, irq_type: int) -> None:
+        logger.debug(f"{self}: set_irq_type {irq_type}")
+        if irq_type == Gpio.IRQ_TYPE_NONE:
+            self.masked = True
+
+        self.irq_type = irq_type
+        self.edge_irq_latched = False
+        self._check_irq()
+
+    def unmask(self) -> None:
+        logger.debug(f"{self}: unmask")
+        self.masked = False
+        self._check_irq()
+
+    def set(self, val: int) -> None:
+        old = self.state
+        new = bool(val)
+
+        if old != new:
+            logger.debug(f"{self}: gpio set {old} -> {new}")
+
+        self.state = new
+        if self._latch_edge_irq(old, new):
+            logger.debug(f"{self}: latching edge")
+            self.edge_irq_latched = True
+
+        self._check_irq()
+
+    def __str__(self) -> str:
+        return f"Gpio({self.pin})"
+
+
+class GpioBackend:
+    def __init__(self, backend: "Backend") -> None:
+        self.backend = backend
+        self.gpios = [Gpio(backend, pin) for pin in range(64)]
+
+    def set(self, pin: Optional[int], val: bool) -> None:
+        if pin is None:
+            return
+
+        self.gpios[pin].set(val)
+
+    def set_irq_type(self, pin: int, irq_type: int) -> None:
+        self.gpios[pin].set_irq_type(irq_type)
+
+    def unmask(self, pin: int) -> None:
+        self.gpios[pin].unmask()
diff --git a/tools/testing/roadtest/roadtest/backend/i2c.py b/tools/testing/roadtest/roadtest/backend/i2c.py
new file mode 100644
index 000000000000..b877c2b76851
--- /dev/null
+++ b/tools/testing/roadtest/roadtest/backend/i2c.py
@@ -0,0 +1,123 @@ 
+# SPDX-License-Identifier: GPL-2.0-only
+# Copyright Axis Communications AB
+
+import abc
+import importlib
+import logging
+import typing
+from typing import Any, Literal, Optional
+
+if typing.TYPE_CHECKING:
+    # Avoid circular imports
+    from .backend import Backend
+
+logger = logging.getLogger(__name__)
+
+
+class I2CBackend:
+    def __init__(self, backend: "Backend") -> None:
+        self.model: Optional[I2CModel] = None
+        self.backend = backend
+
+    def load_model(self, modname: str, clsname: str, *args: Any, **kwargs: Any) -> None:
+        mod = importlib.import_module(modname)
+        cls = getattr(mod, clsname)
+        self.model = cls(*args, **kwargs, backend=self.backend)
+
+    def unload_model(self) -> None:
+        self.model = None
+
+    def read(self, length: int) -> bytes:
+        if not self.model:
+            raise Exception("No I2C model loaded")
+
+        return self.model.read(length)
+
+    def write(self, data: bytes) -> None:
+        if not self.model:
+            raise Exception("No I2C model loaded")
+
+        self.model.write(data)
+
+    def __getattr__(self, name: str) -> Any:
+        return getattr(self.model, name)
+
+
+class I2CModel(abc.ABC):
+    def __init__(self, backend: "Backend") -> None:
+        self.backend = backend
+
+    @abc.abstractmethod
+    def read(self, length: int) -> bytes:
+        return bytes(length)
+
+    @abc.abstractmethod
+    def write(self, data: bytes) -> None:
+        pass
+
+
+class SMBusModel(I2CModel):
+    def __init__(
+        self,
+        regbytes: int,
+        byteorder: Literal["little", "big"] = "little",
+        *args: Any,
+        **kwargs: Any,
+    ) -> None:
+        super().__init__(*args, **kwargs)
+        self.reg_addr = 0x0
+        self.regbytes = regbytes
+        self.byteorder = byteorder
+
+    @abc.abstractmethod
+    def reg_read(self, addr: int) -> int:
+        return 0
+
+    @abc.abstractmethod
+    def reg_write(self, addr: int, val: int) -> None:
+        pass
+
+    def val_to_bytes(self, val: int) -> bytes:
+        return val.to_bytes(self.regbytes, self.byteorder)
+
+    def bytes_to_val(self, data: bytes) -> int:
+        return int.from_bytes(data, self.byteorder)
+
+    def read(self, length: int) -> bytes:
+        data = bytearray()
+        for idx in range(0, length, self.regbytes):
+            addr = self.reg_addr + idx
+            val = self.reg_read(addr)
+            logger.debug(f"SMBus read {addr=:#02x} {val=:#02x}")
+            data += self.val_to_bytes(val)
+        return bytes(data)
+
+    def write(self, data: bytes) -> None:
+        self.reg_addr = data[0]
+
+        if len(data) > 1:
+            length = len(data) - 1
+            data = data[1:]
+            assert length % self.regbytes == 0
+            for idx in range(0, length, self.regbytes):
+                val = self.bytes_to_val(data[idx : (idx + self.regbytes)])
+                addr = self.reg_addr + idx
+                self.backend.mock.reg_write(addr, val)
+                self.reg_write(addr, val)
+                logger.debug(f"SMBus write {addr=:#02x} {val=:#02x}")
+        elif len(data) == 1:
+            pass
+
+
+class SimpleSMBusModel(SMBusModel):
+    def __init__(self, regs: dict[int, int], **kwargs: Any) -> None:
+        super().__init__(**kwargs)
+        self.regs = regs
+
+    def reg_read(self, addr: int) -> int:
+        val = self.regs[addr]
+        return val
+
+    def reg_write(self, addr: int, val: int) -> None:
+        assert addr in self.regs
+        self.regs[addr] = val
diff --git a/tools/testing/roadtest/roadtest/backend/main.py b/tools/testing/roadtest/roadtest/backend/main.py
new file mode 100644
index 000000000000..25be86ded9ea
--- /dev/null
+++ b/tools/testing/roadtest/roadtest/backend/main.py
@@ -0,0 +1,13 @@ 
+# SPDX-License-Identifier: GPL-2.0-only
+# Copyright Axis Communications AB
+
+import logging
+
+import roadtest.backend.backend
+
+logging.basicConfig(
+    format="%(asctime)s - %(levelname)s - %(name)s: %(message)s", level=logging.DEBUG
+)
+
+backend = roadtest.backend.backend.Backend()
+backend.process_control()
diff --git a/tools/testing/roadtest/roadtest/backend/mock.py b/tools/testing/roadtest/roadtest/backend/mock.py
new file mode 100644
index 000000000000..8ce33a6bc0f1
--- /dev/null
+++ b/tools/testing/roadtest/roadtest/backend/mock.py
@@ -0,0 +1,20 @@ 
+# SPDX-License-Identifier: GPL-2.0-only
+# Copyright Axis Communications AB
+
+import functools
+from pathlib import Path
+from typing import Any, Callable
+
+from roadtest.core.opslog import OpsLogWriter
+
+
+class MockBackend:
+    def __init__(self, work: Path) -> None:
+        self.opslog = OpsLogWriter(work)
+
+    @functools.cache
+    def __getattr__(self, name: str) -> Callable:
+        def func(*args: Any, **kwargs: Any) -> None:
+            self.opslog.write(f"mock.{name}(*{str(args)}, **{str(kwargs)})")
+
+        return func
diff --git a/tools/testing/roadtest/roadtest/backend/test_gpio.py b/tools/testing/roadtest/roadtest/backend/test_gpio.py
new file mode 100644
index 000000000000..feffe4fb9625
--- /dev/null
+++ b/tools/testing/roadtest/roadtest/backend/test_gpio.py
@@ -0,0 +1,98 @@ 
+# SPDX-License-Identifier: GPL-2.0-only
+# Copyright Axis Communications AB
+
+import unittest
+from unittest.mock import MagicMock
+
+from .gpio import Gpio
+
+
+class TestGpio(unittest.TestCase):
+    def test_irq_low(self) -> None:
+        m = MagicMock()
+        gpio = Gpio(backend=m, pin=1)
+
+        gpio.set_irq_type(Gpio.IRQ_TYPE_LEVEL_LOW)
+        m.c.trigger_gpio_irq.assert_not_called()
+
+        gpio.unmask()
+        m.c.trigger_gpio_irq.assert_called_once_with(1)
+        m.c.trigger_gpio_irq.reset_mock()
+
+        gpio.set(True)
+        gpio.unmask()
+        m.c.trigger_gpio_irq.assert_not_called()
+
+    def test_irq_high(self) -> None:
+        m = MagicMock()
+        gpio = Gpio(backend=m, pin=2)
+
+        gpio.set_irq_type(Gpio.IRQ_TYPE_LEVEL_HIGH)
+        gpio.unmask()
+
+        m.c.trigger_gpio_irq.assert_not_called()
+
+        gpio.set(True)
+        m.c.trigger_gpio_irq.assert_called_once_with(2)
+        m.c.trigger_gpio_irq.reset_mock()
+
+        gpio.set(False)
+        gpio.unmask()
+        m.c.trigger_gpio_irq.assert_not_called()
+
+    def test_irq_rising(self) -> None:
+        m = MagicMock()
+        gpio = Gpio(backend=m, pin=63)
+
+        gpio.set_irq_type(Gpio.IRQ_TYPE_EDGE_RISING)
+        gpio.set(False)
+        gpio.set(True)
+
+        m.c.trigger_gpio_irq.assert_not_called()
+        gpio.unmask()
+        m.c.trigger_gpio_irq.assert_called_once_with(63)
+        m.c.trigger_gpio_irq.reset_mock()
+
+        gpio.set(False)
+        gpio.set(True)
+
+        gpio.unmask()
+        m.c.trigger_gpio_irq.assert_called_once()
+
+    def test_irq_falling(self) -> None:
+        m = MagicMock()
+        gpio = Gpio(backend=m, pin=0)
+
+        gpio.set_irq_type(Gpio.IRQ_TYPE_EDGE_FALLING)
+        gpio.unmask()
+        gpio.set(False)
+        gpio.set(True)
+        m.c.trigger_gpio_irq.assert_not_called()
+
+        gpio.set(False)
+        m.c.trigger_gpio_irq.assert_called_once_with(0)
+        m.c.trigger_gpio_irq.reset_mock()
+
+        gpio.set(True)
+        gpio.set(False)
+        gpio.set(True)
+        gpio.unmask()
+        m.c.trigger_gpio_irq.assert_called_once()
+
+    def test_irq_both(self) -> None:
+        m = MagicMock()
+        gpio = Gpio(backend=m, pin=32)
+
+        gpio.set_irq_type(Gpio.IRQ_TYPE_EDGE_BOTH)
+        gpio.unmask()
+        gpio.set(False)
+        gpio.set(True)
+        m.c.trigger_gpio_irq.assert_called_once_with(32)
+
+        gpio.set(False)
+        m.c.trigger_gpio_irq.assert_called_once_with(32)
+        m.c.trigger_gpio_irq.reset_mock()
+
+        gpio.set(True)
+        gpio.unmask()
+        m.c.trigger_gpio_irq.assert_called_once_with(32)
diff --git a/tools/testing/roadtest/roadtest/backend/test_i2c.py b/tools/testing/roadtest/roadtest/backend/test_i2c.py
new file mode 100644
index 000000000000..eda4e1a4b80f
--- /dev/null
+++ b/tools/testing/roadtest/roadtest/backend/test_i2c.py
@@ -0,0 +1,84 @@ 
+# SPDX-License-Identifier: GPL-2.0-only
+# Copyright Axis Communications AB
+
+import unittest
+from typing import Any
+from unittest.mock import MagicMock
+
+from .i2c import SimpleSMBusModel, SMBusModel
+
+
+class DummyModel(SMBusModel):
+    def __init__(self, *args: Any, **kwargs: Any) -> None:
+        super().__init__(*args, **kwargs)
+        self.regs: dict[int, int] = {}
+
+    def reg_read(self, addr: int) -> int:
+        return self.regs[addr]
+
+    def reg_write(self, addr: int, val: int) -> None:
+        self.regs[addr] = val
+
+
+class TestSMBusModel(unittest.TestCase):
+    def test_1(self) -> None:
+        m = DummyModel(regbytes=1, backend=MagicMock())
+
+        m.write(bytes([0x12, 0x34]))
+        m.write(bytes([0x13, 0xAB, 0xCD]))
+
+        self.assertEqual(m.regs[0x12], 0x34)
+        self.assertEqual(m.regs[0x13], 0xAB)
+        self.assertEqual(m.regs[0x14], 0xCD)
+
+        m.write(bytes([0x12]))
+        self.assertEqual(m.read(1), bytes([0x34]))
+
+        m.write(bytes([0x12]))
+        self.assertEqual(m.read(3), bytes([0x34, 0xAB, 0xCD]))
+
+    def test_2big(self) -> None:
+        m = DummyModel(regbytes=2, byteorder="big", backend=MagicMock())
+
+        m.write(bytes([0x12, 0x34, 0x56, 0xAB, 0xCD]))
+        self.assertEqual(m.regs[0x12], 0x3456)
+        self.assertEqual(m.regs[0x14], 0xABCD)
+
+        m.write(bytes([0x12]))
+        self.assertEqual(m.read(2), bytes([0x34, 0x56]))
+
+        m.write(bytes([0x14]))
+        self.assertEqual(m.read(2), bytes([0xAB, 0xCD]))
+
+        m.write(bytes([0x12]))
+        self.assertEqual(m.read(4), bytes([0x34, 0x56, 0xAB, 0xCD]))
+
+    def test_2little(self) -> None:
+        m = DummyModel(regbytes=2, byteorder="little", backend=MagicMock())
+
+        m.write(bytes([0x12, 0x34, 0x56, 0xAB, 0xCD]))
+        self.assertEqual(m.regs[0x12], 0x5634)
+        self.assertEqual(m.regs[0x14], 0xCDAB)
+
+        m.write(bytes([0x12]))
+        self.assertEqual(m.read(2), bytes([0x34, 0x56]))
+
+
+class TestSimpleSMBusModel(unittest.TestCase):
+    def test_simple(self) -> None:
+        m = SimpleSMBusModel(
+            regs={0x01: 0x12, 0x02: 0x34},
+            regbytes=1,
+            backend=MagicMock(),
+        )
+        self.assertEqual(m.reg_read(0x01), 0x12)
+        self.assertEqual(m.reg_read(0x02), 0x34)
+
+        m.reg_write(0x01, 0x56)
+        self.assertEqual(m.reg_read(0x01), 0x56)
+        self.assertEqual(m.reg_read(0x02), 0x34)
+
+        with self.assertRaises(Exception):
+            m.reg_write(0x03, 0x00)
+        with self.assertRaises(Exception):
+            m.reg_read(0x03)
diff --git a/tools/testing/roadtest/roadtest/cmd/__init__.py b/tools/testing/roadtest/roadtest/cmd/__init__.py
new file mode 100644
index 000000000000..e69de29bb2d1
diff --git a/tools/testing/roadtest/roadtest/cmd/main.py b/tools/testing/roadtest/roadtest/cmd/main.py
new file mode 100644
index 000000000000..634c27fe795c
--- /dev/null
+++ b/tools/testing/roadtest/roadtest/cmd/main.py
@@ -0,0 +1,146 @@ 
+# SPDX-License-Identifier: GPL-2.0-only
+# Copyright Axis Communications AB
+
+import argparse
+import fnmatch
+import sys
+import unittest
+from typing import Optional
+from unittest.suite import TestSuite
+
+assert sys.version_info >= (3, 9), "Python version is too old"
+
+from roadtest.core.suite import UMLSuite, UMLTestCase
+
+
+def make_umlsuite(args: argparse.Namespace) -> UMLSuite:
+    return UMLSuite(
+        timeout=args.timeout,
+        workdir=args.work_dir,
+        builddir=args.build_dir,
+        ksrcdir=args.ksrc_dir,
+        uml_args_pre=args.uml_prepend,
+        uml_args_post=args.uml_append,
+        shell=args.shell,
+    )
+
+
+def main() -> None:
+    parser = argparse.ArgumentParser()
+    parser.add_argument(
+        "--timeout",
+        type=int,
+        default=60,
+        help="Timeout (in seconds) for each UML run, 0 to disable",
+    )
+    parser.add_argument("--work-dir", type=str, help="Work directory for UML runs")
+    parser.add_argument("--build-dir", type=str, required=True)
+    parser.add_argument("--ksrc-dir", type=str, required=True)
+    parser.add_argument(
+        "--uml-prepend",
+        nargs="*",
+        default=[],
+        help="Extra arguments to prepend to the UML command (example: gdbserver :1234)",
+    )
+    parser.add_argument(
+        "--uml-append",
+        nargs="*",
+        default=[],
+        help="Extra arguments to append to the UML command (example: trace_event=i2c:* tp_printk)",
+    )
+    parser.add_argument(
+        "--filter",
+        nargs="+",
+        default=[],
+    )
+    parser.add_argument("--shell", action="store_true")
+    parser.add_argument("test", nargs="?", default="roadtest")
+    args = parser.parse_args()
+
+    if args.shell:
+        args.timeout = 0
+
+        if not any(p.startswith("con=") for p in args.uml_append):
+            print(
+                "Error: --shell used but no con= UML argument specified",
+                file=sys.stderr,
+            )
+            sys.exit(1)
+
+    test = args.test
+    test = test.replace("/", ".")
+    test = test.removesuffix(".py")
+    test = test.removesuffix(".")
+
+    loader = unittest.defaultTestLoader
+    suitegroups = loader.discover(test)
+
+    args.filter = [f"*{f}*" for f in args.filter]
+
+    # Backend tests and the like don't need to be run inside UML.
+    localsuite = None
+
+    # For simplicity, we currently run all target tests in one UML instance
+    # since python in UML is slow to start up.  This can be revisited if we
+    # want to run several UML instances in parallel.
+    deftargetsuite = None
+    targetsuites = []
+
+    for suites in suitegroups:
+        # unittest can in arbitrarily nest and mix TestCases
+        # and TestSuites, but we expect a fixed hierarchy.
+        assert isinstance(suites, unittest.TestSuite)
+
+        for suite in suites:
+            # assert not isinstance(suite, unittest.TestCase)
+
+            # If the import of a test fails, then suite is a
+            # unittest.loader._FailedTest instead of a suite
+            if not isinstance(suite, unittest.TestSuite):
+                suite = [suite]  # type: ignore[assignment]
+
+            # Suite at this level contains one TestCase for each
+            # test method in a particular test class.
+            #
+            # All the test functions for one particular test class
+            # can only be run either in UML or locally, not mixed.
+            destsuite: Optional[TestSuite] = None
+
+            for t in suite:  # type: ignore[union-attr]
+                # We don't support suites nested at this level.
+                assert isinstance(t, unittest.TestCase)
+
+                id = t.id()
+                if args.filter and not any(fnmatch.fnmatch(id, f) for f in args.filter):
+                    continue
+
+                if isinstance(t, UMLTestCase):
+                    if t.run_separately:
+                        if not destsuite:
+                            destsuite = make_umlsuite(args)
+                            targetsuites.append(destsuite)
+                    else:
+                        if not deftargetsuite:
+                            deftargetsuite = make_umlsuite(args)
+                            targetsuites.append(deftargetsuite)
+
+                        destsuite = deftargetsuite
+                else:
+                    if not localsuite:
+                        localsuite = TestSuite()
+                    destsuite = localsuite
+
+                if destsuite:
+                    destsuite.addTest(t)
+
+    tests = unittest.TestSuite()
+    if localsuite:
+        tests.addTest(localsuite)
+    tests.addTests(targetsuites)
+
+    result = unittest.TextTestRunner(verbosity=2).run(tests)
+    sys.exit(not result.wasSuccessful())
+
+
+if __name__ == "__main__":
+    main()
diff --git a/tools/testing/roadtest/roadtest/cmd/remote.py b/tools/testing/roadtest/roadtest/cmd/remote.py
new file mode 100644
index 000000000000..29c3c6d35c65
--- /dev/null
+++ b/tools/testing/roadtest/roadtest/cmd/remote.py
@@ -0,0 +1,48 @@ 
+# SPDX-License-Identifier: GPL-2.0-only
+# Copyright Axis Communications AB
+
+import importlib
+import json
+import os
+from pathlib import Path
+from typing import cast
+from unittest import TestSuite, TextTestRunner
+
+from roadtest import ENV_WORK_DIR
+from roadtest.core import proxy
+
+
+def main() -> None:
+    workdir = Path(os.environ[ENV_WORK_DIR])
+    with open(workdir / "tests.json") as f:
+        testinfos = json.load(f)
+
+    suite = TestSuite()
+    for info in testinfos:
+        id = info["id"]
+        *modparts, clsname, method = id.split(".")
+
+        fullname = ".".join(modparts)
+        mod = importlib.import_module(fullname)
+
+        cls = getattr(mod, clsname)
+        test = cls(methodName=method)
+
+        values = info["values"]
+        if values:
+            test.dts.values = values
+
+        suite.addTest(test)
+
+    runner = TextTestRunner(
+        verbosity=0, buffer=False, resultclass=proxy.ProxyTextTestResult
+    )
+    result = cast(proxy.ProxyTextTestResult, runner.run(suite))
+
+    proxyresult = result.to_proxy()
+    with open(workdir / "results.json", "w") as f:
+        json.dump(proxyresult, f)
+
+
+if __name__ == "__main__":
+    main()
diff --git a/tools/testing/roadtest/roadtest/core/__init__.py b/tools/testing/roadtest/roadtest/core/__init__.py
new file mode 100644
index 000000000000..e69de29bb2d1
diff --git a/tools/testing/roadtest/roadtest/core/control.py b/tools/testing/roadtest/roadtest/core/control.py
new file mode 100644
index 000000000000..cd74861099b9
--- /dev/null
+++ b/tools/testing/roadtest/roadtest/core/control.py
@@ -0,0 +1,52 @@ 
+# SPDX-License-Identifier: GPL-2.0-only
+# Copyright Axis Communications AB
+
+import logging
+import os
+from pathlib import Path
+from typing import Optional
+
+from roadtest import ENV_WORK_DIR
+
+CONTROL_FILE = "control.txt"
+
+logger = logging.getLogger(__name__)
+
+
+class ControlReader:
+    def __init__(self, work_dir: Optional[Path] = None) -> None:
+        if not work_dir:
+            work_dir = Path(os.environ[ENV_WORK_DIR])
+
+        path = work_dir / CONTROL_FILE
+        path.unlink(missing_ok=True)
+        path.write_text("")
+
+        self.file = path.open("r")
+
+    def process(self, vars: dict) -> None:
+        for line in self.file.readlines():
+            cmd = line.rstrip()
+
+            if cmd.startswith("# "):
+                logger.info(line[2:].rstrip())
+                continue
+
+            logger.debug(cmd)
+            eval(cmd, vars)
+
+
+class ControlWriter:
+    def __init__(self, work_dir: Optional[Path] = None) -> None:
+        if not work_dir:
+            work_dir = Path(os.environ[ENV_WORK_DIR])
+        self.file = (work_dir / CONTROL_FILE).open("a", buffering=1)
+
+    def write_cmd(self, line: str) -> None:
+        self.file.write(line + "\n")
+
+    def write_log(self, line: str) -> None:
+        self.file.write(f"# {line}\n")
+
+    def close(self) -> None:
+        self.file.close()
diff --git a/tools/testing/roadtest/roadtest/core/devicetree.py b/tools/testing/roadtest/roadtest/core/devicetree.py
new file mode 100644
index 000000000000..40876738fb39
--- /dev/null
+++ b/tools/testing/roadtest/roadtest/core/devicetree.py
@@ -0,0 +1,155 @@ 
+# SPDX-License-Identifier: GPL-2.0-only
+# Copyright Axis Communications AB
+
+import enum
+import subprocess
+from pathlib import Path
+from typing import Any, Optional
+
+HEADER = """
+/dts-v1/;
+
+/ {
+    #address-cells = <2>;
+    #size-cells = <2>;
+
+    virtio@0 {
+        compatible = "virtio,uml";
+        socket-path = "WORK/gpio.sock";
+        virtio-device-id = <0x29>;
+
+        gpio: gpio {
+            compatible = "virtio,device29";
+
+            gpio-controller;
+            #gpio-cells = <2>;
+
+            interrupt-controller;
+            #interrupt-cells = <2>;
+        };
+    };
+
+    virtio@1 {
+        compatible = "virtio,uml";
+        socket-path = "WORK/i2c.sock";
+        virtio-device-id = <0x22>;
+
+        i2c: i2c {
+            compatible = "virtio,device22";
+
+            #address-cells = <1>;
+            #size-cells = <0>;
+        };
+    };
+
+    // See Hardware.kick()
+    leds {
+        compatible = "gpio-leds";
+        led0 {
+            gpios = <&gpio 0 0>;
+        };
+    };
+};
+"""
+
+
+class DtVar(enum.Enum):
+    I2C_ADDR = 0
+    GPIO_PIN = 1
+
+
+class DtFragment:
+    def __init__(self, src: str, variables: Optional[dict[str, DtVar]] = None) -> None:
+        self.src = src
+        if not variables:
+            variables = {}
+        self.variables = variables
+        self.values: dict[str, int] = {}
+
+    def apply(self, values: dict[str, Any]) -> str:
+        src = self.src
+
+        for var in self.variables.keys():
+            typ = self.variables[var]
+            val = values[var]
+
+            if typ == DtVar.I2C_ADDR:
+                str = f"{val:02x}"
+            elif typ == DtVar.GPIO_PIN:
+                str = f"{val:d}"
+
+            src = src.replace(f"${var}$", str)
+
+        self.values = values
+        return src
+
+    def __getitem__(self, key: str) -> Any:
+        return self.values[key]
+
+
+class Devicetree:
+    def __init__(self, workdir: Path, ksrcdir: Path) -> None:
+        self.workdir: Path = workdir
+        self.ksrcdir: Path = ksrcdir
+        self.next_i2c_addr: int = 0x1
+        # 0 is used for gpio-leds for Hardware.kick()
+        self.next_gpio_pin: int = 1
+        self.src: str = ""
+
+    def assemble(self, fragments: list[DtFragment]) -> None:
+        parts = []
+        for fragment in fragments:
+            if fragment.values:
+                # Multiple test functions from the same class will use
+                # the same class instance
+                continue
+
+            values = {}
+
+            for var, type in fragment.variables.items():
+                if type == DtVar.I2C_ADDR:
+                    values[var] = self.next_i2c_addr
+                    self.next_i2c_addr += 1
+                elif type == DtVar.GPIO_PIN:
+                    values[var] = self.next_gpio_pin
+                    self.next_gpio_pin += 1
+
+            parts.append(fragment.apply(values))
+
+        self.src = "\n".join(parts)
+
+    def compile(self, dtb: str) -> None:
+        dts = self.workdir / "test.dts"
+
+        try:
+            subprocess.run(
+                [
+                    "gcc",
+                    "-E",
+                    "-nostdinc",
+                    f"-I{self.ksrcdir}/scripts/dtc/include-prefixes",
+                    "-undef",
+                    "-D__DTS__",
+                    "-x",
+                    "assembler-with-cpp",
+                    "-o",
+                    dts,
+                    "-",
+                ],
+                input=self.src,
+                text=True,
+                check=True,
+                capture_output=True,
+            )
+
+            full = HEADER.replace("WORK", str(self.workdir)) + dts.read_text()
+            dts.write_text(full)
+
+            subprocess.run(
+                ["dtc", "-I", "dts", "-O", "dtb", dts, "-o", self.workdir / dtb],
+                check=True,
+                capture_output=True,
+                text=True,
+            )
+        except subprocess.CalledProcessError as e:
+            raise Exception(f"{e.stderr}")
diff --git a/tools/testing/roadtest/roadtest/core/hardware.py b/tools/testing/roadtest/roadtest/core/hardware.py
new file mode 100644
index 000000000000..ae81a531d2a2
--- /dev/null
+++ b/tools/testing/roadtest/roadtest/core/hardware.py
@@ -0,0 +1,94 @@ 
+# SPDX-License-Identifier: GPL-2.0-only
+# Copyright Axis Communications AB
+
+import contextlib
+import functools
+import os
+from pathlib import Path
+from typing import Any, Callable, Optional, Type, cast
+from unittest import TestCase
+from unittest.mock import MagicMock, call
+
+from roadtest import ENV_WORK_DIR
+
+from .control import ControlWriter
+from .opslog import OpsLogReader
+from .sysfs import write_int
+
+
+class HwMock(MagicMock):
+    def assert_reg_write_once(self, test: TestCase, reg: int, value: int) -> None:
+        test.assertEqual(
+            [c for c in self.mock_calls if c.args[0] == reg],
+            [call.reg_write(reg, value)],
+        )
+
+    def assert_last_reg_write(self, test: TestCase, reg: int, value: int) -> None:
+        test.assertEqual(
+            [c for c in self.mock_calls if c.args[0] == reg][-1:],
+            [call.reg_write(reg, value)],
+        )
+
+    def get_last_reg_write(self, reg: int) -> int:
+        return cast(int, [c for c in self.mock_calls if c.args[0] == reg][-1].args[1])
+
+
+class Hardware(contextlib.AbstractContextManager):
+    def __init__(self, bus: str, work: Optional[Path] = None) -> None:
+        if not work:
+            work = Path(os.environ[ENV_WORK_DIR])
+
+        self.bus = bus
+        self.mock = HwMock()
+        self.control = ControlWriter(work)
+        self.opslog = OpsLogReader(work)
+        self.loaded_model = False
+
+        # Ignore old entries
+        self.opslog.read_next()
+
+    def _call(self, method: str, *args: Any, **kwargs: Any) -> None:
+        self.control.write_cmd(
+            f"backend.{self.bus}.{method}(*{str(args)}, **{str(kwargs)})"
+        )
+
+    def kick(self) -> None:
+        # Control writes are only applied when the backend gets something
+        # to process, usually because the driver tried to access the device.
+        # But in some cases, such as when the driver is waiting for a
+        # sequence of interrupts, the test code needs the control write to take
+        # effect immediately.  For this, we just need to kick the backend
+        # into processing its control queue.
+        #
+        # We (ab)use gpio-leds for this.  devicetree.py sets up the device.
+        write_int(Path("/sys/class/leds/led0/brightness"), 0)
+
+    def load_model(self, cls: Type[Any], *args: Any, **kwargs: Any) -> "Hardware":
+        self._call("load_model", cls.__module__, cls.__name__, *args, **kwargs)
+        self.loaded_model = True
+        return self
+
+    def __enter__(self) -> "Hardware":
+        return self
+
+    def __exit__(self, *_: Any) -> None:
+        self.close()
+
+    @functools.cache
+    def __getattr__(self, name: str) -> Callable:
+        def func(*args: Any, **kwargs: Any) -> None:
+            self._call(name, *args, **kwargs)
+
+        return func
+
+    def close(self) -> None:
+        if self.loaded_model:
+            self._call("unload_model")
+        self.control.close()
+
+    def update_mock(self) -> HwMock:
+        opslog = self.opslog.read_next()
+        for line in opslog:
+            eval(line, {"mock": self.mock})
+
+        return self.mock
diff --git a/tools/testing/roadtest/roadtest/core/log.py b/tools/testing/roadtest/roadtest/core/log.py
new file mode 100644
index 000000000000..7d73e40eb2d8
--- /dev/null
+++ b/tools/testing/roadtest/roadtest/core/log.py
@@ -0,0 +1,42 @@ 
+# SPDX-License-Identifier: GPL-2.0-only
+# Copyright Axis Communications AB
+
+from pathlib import Path
+
+
+class LogParser:
+    DNF_MESSAGE = "<Test did not finish cleanly>"
+
+    def __init__(self, file: Path):
+        try:
+            raw = file.read_text()
+            lines = raw.splitlines()
+        except FileNotFoundError:
+            lines = []
+            raw = ""
+
+        self.raw = raw
+        self.lines = lines
+
+    def has_any(self) -> bool:
+        return "START<" in self.raw
+
+    def get_testcase_log(self, id: str) -> list[str]:
+        startmarker = f"START<{id}>"
+        stopmarker = f"STOP<{id}>"
+
+        try:
+            startpos = next(
+                i for i, line in enumerate(self.lines) if startmarker in line
+            )
+        except StopIteration:
+            return []
+
+        try:
+            stoppos = next(
+                i for i, line in enumerate(self.lines[startpos:]) if stopmarker in line
+            )
+        except StopIteration:
+            return self.lines[startpos + 1 :] + [LogParser.DNF_MESSAGE]
+
+        return self.lines[startpos + 1 : startpos + stoppos]
diff --git a/tools/testing/roadtest/roadtest/core/modules.py b/tools/testing/roadtest/roadtest/core/modules.py
new file mode 100644
index 000000000000..5bd2d92a322b
--- /dev/null
+++ b/tools/testing/roadtest/roadtest/core/modules.py
@@ -0,0 +1,38 @@ 
+# SPDX-License-Identifier: GPL-2.0-only
+# Copyright Axis Communications AB
+
+import os
+import subprocess
+from pathlib import Path
+from typing import Any
+
+from roadtest import ENV_BUILD_DIR
+
+
+def modprobe(modname: str, remove: bool = False) -> None:
+    moddir = Path(os.environ[ENV_BUILD_DIR]) / "modules"
+    args = []
+    if remove:
+        args.append("--remove")
+    args += [f"--dirname={moddir}", modname]
+    subprocess.check_output(["/sbin/modprobe"] + args)
+
+
+def insmod(modname: str) -> None:
+    modprobe(modname)
+
+
+def rmmod(modname: str) -> None:
+    subprocess.check_output(["/sbin/rmmod", modname])
+
+
+class Module:
+    def __init__(self, name: str) -> None:
+        self.name = name
+
+    def __enter__(self) -> "Module":
+        modprobe(self.name)
+        return self
+
+    def __exit__(self, *_: Any) -> None:
+        rmmod(self.name)
diff --git a/tools/testing/roadtest/roadtest/core/opslog.py b/tools/testing/roadtest/roadtest/core/opslog.py
new file mode 100644
index 000000000000..83bb4f525d03
--- /dev/null
+++ b/tools/testing/roadtest/roadtest/core/opslog.py
@@ -0,0 +1,35 @@ 
+# SPDX-License-Identifier: GPL-2.0-only
+# Copyright Axis Communications AB
+
+import os
+from pathlib import Path
+
+OPSLOG_FILE = "opslog.txt"
+
+
+class OpsLogWriter:
+    def __init__(self, work: Path) -> None:
+        path = work / OPSLOG_FILE
+        path.unlink(missing_ok=True)
+        self.file = open(path, "a", buffering=1)
+
+    def write(self, line: str) -> None:
+        self.file.write(line + "\n")
+
+
+class OpsLogReader:
+    def __init__(self, work: Path) -> None:
+        self.path = work / OPSLOG_FILE
+        self.opslogpos = 0
+
+    def read_next(self) -> list[str]:
+        # There is a problem in hostfs (see Hostfs Caveats) which means
+        # that reads from UML on a file which is extended on the host don't see
+        # the new data unless we open and close the file, so we can't open once
+        # and use readlines().
+        with open(self.path, "r") as f:
+            os.lseek(f.fileno(), self.opslogpos, os.SEEK_SET)
+            opslog = [line.rstrip() for line in f.readlines()]
+            self.opslogpos = os.lseek(f.fileno(), 0, os.SEEK_CUR)
+
+        return opslog
diff --git a/tools/testing/roadtest/roadtest/core/proxy.py b/tools/testing/roadtest/roadtest/core/proxy.py
new file mode 100644
index 000000000000..36089e21d7d5
--- /dev/null
+++ b/tools/testing/roadtest/roadtest/core/proxy.py
@@ -0,0 +1,48 @@ 
+# SPDX-License-Identifier: GPL-2.0-only
+# Copyright Axis Communications AB
+
+from typing import Any
+from unittest import TestCase, TextTestResult
+
+from . import control
+
+
+class ProxyTextTestResult(TextTestResult):
+    def __init__(self, stream: Any, descriptions: Any, verbosity: Any) -> None:
+        super().__init__(stream, descriptions, verbosity)
+        self.successes: list[tuple[TestCase, str]] = []
+
+        # Print via kmsg to avoid getting cut off by other kernel prints.
+        self.kmsg = open("/dev/kmsg", "w", buffering=1)
+        self.control = control.ControlWriter()
+
+    def addSuccess(self, test: TestCase) -> None:
+        super().addSuccess(test)
+        self.successes.append((test, ""))
+
+    def _log(self, test: TestCase, action: str) -> None:
+        line = f"{action}<{test.id()}>"
+        self.kmsg.write(line + "\n")
+        self.control.write_log(line)
+
+    def startTest(self, test: TestCase) -> None:
+        self._log(test, "START")
+        super().startTest(test)
+
+    def stopTest(self, test: TestCase) -> None:
+        super().stopTest(test)
+        self._log(test, "STOP")
+
+    def _replace_id(self, reslist: list[tuple[TestCase, str]]) -> list[tuple[str, str]]:
+        return [(case.id(), tb) for case, tb in reslist]
+
+    def to_proxy(self) -> dict[str, Any]:
+        return {
+            "testsRun": self.testsRun,
+            "wasSuccessful": self.wasSuccessful(),
+            "successes": self._replace_id(self.successes),
+            "errors": self._replace_id(self.errors),
+            "failures": self._replace_id(self.failures),
+            "skipped": self._replace_id(self.skipped),
+            "unexpectedSuccesses": [t.id() for t in self.unexpectedSuccesses],
+        }
diff --git a/tools/testing/roadtest/roadtest/core/suite.py b/tools/testing/roadtest/roadtest/core/suite.py
new file mode 100644
index 000000000000..e99a60b4faba
--- /dev/null
+++ b/tools/testing/roadtest/roadtest/core/suite.py
@@ -0,0 +1,286 @@ 
+# SPDX-License-Identifier: GPL-2.0-only
+# Copyright Axis Communications AB
+
+import json
+import os
+import shlex
+import signal
+import subprocess
+import textwrap
+import unittest
+from pathlib import Path
+from typing import Any, ClassVar, Optional, Tuple, cast
+from unittest import TestResult
+
+from roadtest import ENV_BUILD_DIR, ENV_WORK_DIR
+
+from . import devicetree
+from .log import LogParser
+
+
+class UMLTestCase(unittest.TestCase):
+    run_separately: ClassVar[bool] = False
+    dts: ClassVar[Optional[devicetree.DtFragment]] = None
+
+
+class UMLSuite(unittest.TestSuite):
+    def __init__(
+        self,
+        timeout: int,
+        workdir: str,
+        builddir: str,
+        ksrcdir: str,
+        uml_args_pre: list[str],
+        uml_args_post: list[str],
+        shell: bool,
+        *args: Any,
+        **kwargs: Any,
+    ) -> None:
+        super().__init__(*args, **kwargs)
+
+        self.timeout = timeout
+        self.workdir = Path(workdir).resolve()
+        self.builddir = Path(builddir)
+        self.ksrcdir = Path(ksrcdir)
+        self.uml_args_pre = uml_args_pre
+        self.uml_args_post = uml_args_post
+        self.shell = shell
+
+        self.backendlog = self.workdir / "backend.txt"
+        self.umllog = self.workdir / "uml.txt"
+
+        # Used from the roadtest.cmd.remote running inside UML
+        self.testfile = self.workdir / "tests.json"
+        self.resultfile = self.workdir / "results.json"
+
+    def run(
+        self, result: unittest.TestResult, debug: bool = False
+    ) -> unittest.TestResult:
+        pwd = os.getcwd()
+
+        os.makedirs(self.workdir, exist_ok=True)
+        workdir = self.workdir
+
+        tests = cast(list[UMLTestCase], list(self))
+
+        os.environ[ENV_WORK_DIR] = str(workdir)
+        os.environ[ENV_BUILD_DIR] = str(self.builddir)
+
+        dt = devicetree.Devicetree(workdir=workdir, ksrcdir=self.ksrcdir)
+        dt.assemble([test.dts for test in tests if test.dts])
+        dt.compile("test.dtb")
+
+        testinfos = []
+        ids = []
+        for t in tests:
+            id = t.id()
+            # This fixup is needed when discover is done starting from "roadtest"
+            if not id.startswith("roadtest."):
+                id = f"roadtest.{id}"
+            ids.append(id)
+
+            testinfos.append({"id": id, "values": t.dts.values if t.dts else {}})
+
+        with self.testfile.open("w") as f:
+            json.dump(testinfos, f)
+
+        uml_args = [
+            str(self.builddir / "vmlinux"),
+            f"PYTHONPATH={pwd}",
+            f"{ENV_WORK_DIR}={workdir}",
+            f"{ENV_BUILD_DIR}={self.builddir}",
+            # Should be enough for anybody?
+            "mem=64M",
+            "dtb=test.dtb",
+            "rootfstype=hostfs",
+            "rw",
+            f"init={pwd}/init.sh",
+            f"uml_dir={workdir}",
+            "umid=uml",
+            # ProxyTextTestResult writes to /dev/kmsg
+            "printk.devkmsg=on",
+            "slub_debug",
+            # For ease of debugging
+            "no_hash_pointers",
+        ]
+
+        if self.shell:
+            # See init.sh
+            uml_args += ["ROADTEST_SHELL=1"]
+        else:
+            # Set by slub_debug
+            TAINT_BAD_PAGE = 1 << 5
+            uml_args += [
+                # init.sh increases the loglevel after bootup.
+                "quiet",
+                "panic_on_warn=1",
+                f"panic_on_taint={TAINT_BAD_PAGE}",
+                "oops=panic",
+                # Speeds up delays, but as a consequence also causes
+                # 100% CPU consumption at an idle shell prompt.
+                "time-travel",
+            ]
+
+        main_script = (Path(__file__).parent / "../backend/main.py").resolve()
+
+        args = (
+            [
+                str(self.builddir / "roadtest-backend"),
+                # The socket locations are also present in the devicetree.
+                f"--gpio-socket={workdir}/gpio.sock",
+                f"--i2c-socket={workdir}/i2c.sock",
+                f"--main-script={main_script}",
+                "--",
+            ]
+            + self.uml_args_pre
+            + uml_args
+            + self.uml_args_post
+        )
+
+        print(
+            "Running backend/UML with: {}".format(
+                " ".join([shlex.quote(a) for a in args])
+            )
+        )
+
+        # Truncate instead of deleting so that tail -f can be used to monitor
+        # the log across runs.
+        self.backendlog.write_text("")
+        self.umllog.write_text("")
+        self.resultfile.unlink(missing_ok=True)
+
+        umlpidfile = workdir / "uml/pid"
+        umlpidfile.unlink(missing_ok=True)
+
+        newenv = dict(os.environ, PYTHONPATH=pwd)
+
+        try:
+            process = None
+            with self.backendlog.open("w") as f:
+                process = subprocess.Popen(
+                    args,
+                    env=newenv,
+                    stdin=subprocess.PIPE,
+                    stdout=f,
+                    stderr=subprocess.STDOUT,
+                    text=True,
+                    preexec_fn=os.setsid,
+                )
+                process.wait(self.timeout if self.timeout else None)
+        except subprocess.TimeoutExpired:
+            pass
+        finally:
+            try:
+                if process:
+                    os.killpg(process.pid, signal.SIGKILL)
+            except ProcessLookupError:
+                pass
+            try:
+                pid = int(umlpidfile.read_text())
+                os.killpg(pid, signal.SIGKILL)
+            except (FileNotFoundError, ProcessLookupError):
+                pass
+
+        if process and process.returncode is not None and process.returncode != 0:
+            with self.backendlog.open("a") as f:
+                f.write(f"<Backend exited with error code {process.returncode}>\n")
+
+        try:
+            with self.resultfile.open("r") as f:
+                proxy = json.load(f)
+        except FileNotFoundError:
+            # UML crashed, timed out, etc
+            proxy = None
+
+        return self._convert_results(proxy, tests, result)
+
+    def _parse_status(self, id: str, proxy: dict) -> Tuple[str, str]:
+        if not proxy:
+            return "ERROR", "No result.  UML or backend crashed?\n"
+
+        try:
+            _, tb = next(e for e in proxy["successes"] if e[0] == id)
+            return "ok", ""
+        except StopIteration:
+            pass
+
+        try:
+            _, tb = next(e for e in proxy["errors"] if e[0] == id)
+            return "ERROR", tb
+        except StopIteration:
+            pass
+
+        try:
+            _, tb = next(e for e in proxy["failures"] if e[0] == id)
+            return "FAIL", tb
+        except StopIteration:
+            pass
+
+        # setupClass, etc
+        if proxy["errors"]:
+            _, tb = proxy["errors"][0]
+            return "ERROR", tb
+
+        raise Exception("Unable to parse status")
+
+    def _get_log(
+        self, name: str, parser: LogParser, id: str, full_if_none: bool
+    ) -> Optional[str]:
+        testloglines = parser.get_testcase_log(id)
+        tb = None
+        if testloglines:
+            tb = "\n".join([f"{name} log:"] + [" " + line for line in testloglines])
+        elif full_if_none and not parser.has_any():
+            if parser.raw:
+                tb = "\n".join(
+                    [f"Full {name} log:", textwrap.indent(parser.raw, " ").rstrip()]
+                )
+            else:
+                tb = f"\nNo {name} log found."
+
+        return tb
+
+    def _convert_results(
+        self,
+        proxy: dict,
+        tests: list[UMLTestCase],
+        result: TestResult,
+    ) -> TestResult:
+        umllog = LogParser(self.umllog)
+        backendlog = LogParser(self.backendlog)
+
+        first_fail = True
+        for test in tests:
+            assert isinstance(test, unittest.TestCase)
+
+            id = test.id()
+            if not id.startswith("roadtest."):
+                id = f"roadtest.{id}"
+
+            status, tb = self._parse_status(id, proxy)
+            if status != "ok":
+                parts = []
+
+                backendtb = self._get_log("Backend", backendlog, id, first_fail)
+                if backendtb:
+                    parts.append(backendtb)
+
+                umltb = self._get_log("UML", umllog, id, first_fail)
+                if umltb:
+                    parts.append(umltb)
+
+                # In the case of no START/STOP markers at all in the logs, we include
+                # the full logs, but only do it in the first failing test case to
+                # reduce noise.
+                first_fail = False
+                tb = "\n\n".join(parts + [tb])
+
+            if status == "ERROR":
+                result.errors.append((test, tb))
+            elif status == "FAIL":
+                result.failures.append((test, tb))
+
+            print(f"{test} ... {status}")
+            result.testsRun += 1
+
+        return result
diff --git a/tools/testing/roadtest/roadtest/core/sysfs.py b/tools/testing/roadtest/roadtest/core/sysfs.py
new file mode 100644
index 000000000000..64228978718e
--- /dev/null
+++ b/tools/testing/roadtest/roadtest/core/sysfs.py
@@ -0,0 +1,77 @@ 
+# SPDX-License-Identifier: GPL-2.0-only
+# Copyright Axis Communications AB
+
+import contextlib
+from pathlib import Path
+from typing import Iterator
+
+
+# Path.write_text() is inappropriate since Python calls write(2)
+# a second time if the first one returns an error, if the file
+# was opened as text.
+def write_str(path: Path, val: str) -> None:
+    path.write_bytes(val.encode())
+
+
+def write_int(path: Path, val: int) -> None:
+    write_str(path, str(val))
+
+
+def write_float(path: Path, val: float) -> None:
+    write_str(path, str(val))
+
+
+def read_str(path: Path) -> str:
+    return path.read_text().rstrip()
+
+
+def read_int(path: Path) -> int:
+    return int(read_str(path))
+
+
+def read_float(path: Path) -> float:
+    return float(read_str(path))
+
+
+class I2CDevice:
+    def __init__(self, addr: int, bus: int = 0) -> None:
+        self.id = f"{bus}-{addr:04x}"
+        self.path = Path(f"/sys/bus/i2c/devices/{self.id}")
+
+
+class PlatformDevice:
+    def __init__(self, name: str) -> None:
+        self.id = name
+        self.path = Path(f"/sys/bus/platform/devices/{self.id}")
+
+
+class I2CDriver:
+    def __init__(self, driver: str) -> None:
+        self.driver = driver
+        self.path = Path(f"/sys/bus/i2c/drivers/{driver}")
+
+    @contextlib.contextmanager
+    def bind(self, addr: int, bus: int = 0) -> Iterator[I2CDevice]:
+        dev = I2CDevice(addr, bus)
+        write_str(self.path / "bind", dev.id)
+
+        try:
+            yield dev
+        finally:
+            write_str(self.path / "unbind", dev.id)
+
+
+class PlatformDriver:
+    def __init__(self, driver: str) -> None:
+        self.driver = driver
+        self.path = Path(f"/sys/bus/platform/drivers/{driver}")
+
+    @contextlib.contextmanager
+    def bind(self, addr: str) -> Iterator[PlatformDevice]:
+        dev = PlatformDevice(addr)
+        write_str(self.path / "bind", dev.id)
+
+        try:
+            yield dev
+        finally:
+            write_str(self.path / "unbind", dev.id)
diff --git a/tools/testing/roadtest/roadtest/core/test_control.py b/tools/testing/roadtest/roadtest/core/test_control.py
new file mode 100644
index 000000000000..a8cf9105eb52
--- /dev/null
+++ b/tools/testing/roadtest/roadtest/core/test_control.py
@@ -0,0 +1,35 @@ 
+# SPDX-License-Identifier: GPL-2.0-only
+# Copyright Axis Communications AB
+
+from pathlib import Path
+from tempfile import TemporaryDirectory
+from unittest import TestCase
+
+from .control import ControlReader, ControlWriter
+
+
+class TestControl(TestCase):
+    def test_control(self) -> None:
+        with TemporaryDirectory() as tmpdir:
+            work = Path(tmpdir)
+            reader = ControlReader(work)
+            writer = ControlWriter(work)
+
+            values = []
+
+            def append(new: int) -> None:
+                nonlocal values
+                values.append(new)
+
+            vars = {"append": append}
+            writer.write_cmd("append(1)")
+
+            reader.process(vars)
+            self.assertEqual(values, [1])
+
+            writer.write_cmd("append(2)")
+            writer.write_log("append(4)")
+            writer.write_cmd("append(3)")
+
+            reader.process(vars)
+            self.assertEqual(values, [1, 2, 3])
diff --git a/tools/testing/roadtest/roadtest/core/test_devicetree.py b/tools/testing/roadtest/roadtest/core/test_devicetree.py
new file mode 100644
index 000000000000..db61fd24b39a
--- /dev/null
+++ b/tools/testing/roadtest/roadtest/core/test_devicetree.py
@@ -0,0 +1,31 @@ 
+# SPDX-License-Identifier: GPL-2.0-only
+# Copyright Axis Communications AB
+
+import tempfile
+import unittest
+from pathlib import Path
+
+from . import devicetree
+
+
+class TestDevicetree(unittest.TestCase):
+    def test_compile(self) -> None:
+        with tempfile.TemporaryDirectory() as tmp:
+            tmpdir = Path(tmp)
+            # We don't have the ksrcdir so we can't test if includes work.
+            dt = devicetree.Devicetree(tmpdir, tmpdir)
+
+            dt.assemble(
+                [
+                    devicetree.DtFragment(
+                        src="""
+&i2c {
+    foo = <1>;
+};
+            """
+                    )
+                ]
+            )
+            dt.compile("test.dtb")
+            dtb = tmpdir / "test.dtb"
+            self.assertTrue((dtb).exists())
diff --git a/tools/testing/roadtest/roadtest/core/test_hardware.py b/tools/testing/roadtest/roadtest/core/test_hardware.py
new file mode 100644
index 000000000000..eb09b317e258
--- /dev/null
+++ b/tools/testing/roadtest/roadtest/core/test_hardware.py
@@ -0,0 +1,41 @@ 
+# SPDX-License-Identifier: GPL-2.0-only
+# Copyright Axis Communications AB
+
+from pathlib import Path
+from tempfile import TemporaryDirectory
+from unittest import TestCase
+
+from roadtest.backend.mock import MockBackend
+
+from .hardware import Hardware
+
+
+class TestHardware(TestCase):
+    def test_mock(self) -> None:
+        with TemporaryDirectory() as tmpdir:
+            work = Path(tmpdir)
+
+            backend = MockBackend(work)
+            hw = Hardware(bus="dummy", work=work)
+
+            backend.reg_write(0x1, 0xDEAD)
+            backend.reg_write(0x2, 0xBEEF)
+            mock = hw.update_mock()
+            mock.assert_reg_write_once(self, 0x1, 0xDEAD)
+
+            backend.reg_write(0x1, 0xCAFE)
+            mock = hw.update_mock()
+            with self.assertRaises(AssertionError):
+                mock.assert_reg_write_once(self, 0x1, 0xDEAD)
+
+            mock.assert_last_reg_write(self, 0x1, 0xCAFE)
+
+            self.assertEqual(mock.get_last_reg_write(0x1), 0xCAFE)
+            self.assertEqual(mock.get_last_reg_write(0x2), 0xBEEF)
+
+            with self.assertRaises(IndexError):
+                self.assertEqual(mock.get_last_reg_write(0x3), 0x0)
+
+            mock.reset_mock()
+            with self.assertRaises(AssertionError):
+                mock.assert_last_reg_write(self, 0x2, 0xBEEF)
diff --git a/tools/testing/roadtest/roadtest/core/test_log.py b/tools/testing/roadtest/roadtest/core/test_log.py
new file mode 100644
index 000000000000..6988ff4419db
--- /dev/null
+++ b/tools/testing/roadtest/roadtest/core/test_log.py
@@ -0,0 +1,54 @@ 
+# SPDX-License-Identifier: GPL-2.0-only
+# Copyright Axis Communications AB
+
+from pathlib import Path
+from tempfile import NamedTemporaryFile
+from unittest import TestCase
+
+from .log import LogParser
+
+
+class TestLog(TestCase):
+    def test_parser(self) -> None:
+        with NamedTemporaryFile() as tmpfile:
+            path = Path(tmpfile.name)
+
+            path.write_text(
+                """
+xyz START<finished>
+finished1
+finished2
+STOP<finished>
+START<empty>
+STOP<empty>
+START<foo> monkey STOP<foo>
+START<unfinished>
+unfinished1
+unfinished2"""
+            )
+
+            parser = LogParser(path)
+            self.assertEqual(
+                parser.get_testcase_log("finished"), ["finished1", "finished2"]
+            )
+
+            self.assertEqual(
+                parser.get_testcase_log("unfinished"),
+                ["unfinished1", "unfinished2", LogParser.DNF_MESSAGE],
+            )
+
+            self.assertEqual(
+                parser.get_testcase_log("notpresent"),
+                [],
+            )
+
+            self.assertEqual(
+                parser.get_testcase_log("enpty"),
+                [],
+            )
+
+            # Shouldn't happen since we print from the kernel?
+            self.assertEqual(
+                parser.get_testcase_log("foo"),
+                [],
+            )
diff --git a/tools/testing/roadtest/roadtest/core/test_opslog.py b/tools/testing/roadtest/roadtest/core/test_opslog.py
new file mode 100644
index 000000000000..bd594c587032
--- /dev/null
+++ b/tools/testing/roadtest/roadtest/core/test_opslog.py
@@ -0,0 +1,27 @@ 
+# SPDX-License-Identifier: GPL-2.0-only
+# Copyright Axis Communications AB
+
+from pathlib import Path
+from tempfile import TemporaryDirectory
+from unittest import TestCase
+
+from .opslog import OpsLogReader, OpsLogWriter
+
+
+class TestOpsLOg(TestCase):
+    def test_opslog(self) -> None:
+        with TemporaryDirectory() as tmpdir:
+            work = Path(tmpdir)
+            writer = OpsLogWriter(work)
+            reader = OpsLogReader(work)
+
+            self.assertEqual(reader.read_next(), [])
+
+            writer.write("1")
+            writer.write("2")
+
+            self.assertEqual(reader.read_next(), ["1", "2"])
+            self.assertEqual(reader.read_next(), [])
+
+            writer.write("3")
+            self.assertEqual(reader.read_next(), ["3"])
diff --git a/tools/testing/roadtest/roadtest/tests/__init__.py b/tools/testing/roadtest/roadtest/tests/__init__.py
new file mode 100644
index 000000000000..e69de29bb2d1