diff mbox series

[v2,22/25] selftests: tcp_authopt: Initial tests for l3mdev handling

Message ID aa93b9dade20a827f26e63417eab6ac4184de94c.1635784253.git.cdleonard@gmail.com (mailing list archive)
State New
Headers show
Series [v2,01/25] tcp: authopt: Initial support and key management | expand

Commit Message

Leonard Crestez Nov. 1, 2021, 4:34 p.m. UTC
These tests also verify functionality in TCP-MD5 and unsigned traffic
modes. They were used to find the issue fixed in commit 86f1e3a8489f
("tcp: md5: Fix overlap between vrf and non-vrf keys")

Signed-off-by: Leonard Crestez <cdleonard@gmail.com>
---
 .../tcp_authopt_test/linux_tcp_authopt.py     |   9 +
 .../tcp_authopt_test/test_vrf_bind.py         | 492 ++++++++++++++++++
 .../tcp_authopt_test/vrf_netns_fixture.py     | 127 +++++
 3 files changed, 628 insertions(+)
 create mode 100644 tools/testing/selftests/tcp_authopt/tcp_authopt_test/test_vrf_bind.py
 create mode 100644 tools/testing/selftests/tcp_authopt/tcp_authopt_test/vrf_netns_fixture.py
diff mbox series

Patch

diff --git a/tools/testing/selftests/tcp_authopt/tcp_authopt_test/linux_tcp_authopt.py b/tools/testing/selftests/tcp_authopt/tcp_authopt_test/linux_tcp_authopt.py
index 75cf5f993ccb..2a720d49cba2 100644
--- a/tools/testing/selftests/tcp_authopt/tcp_authopt_test/linux_tcp_authopt.py
+++ b/tools/testing/selftests/tcp_authopt/tcp_authopt_test/linux_tcp_authopt.py
@@ -38,10 +38,11 @@  class TCP_AUTHOPT_FLAG(IntFlag):
 
 class TCP_AUTHOPT_KEY_FLAG(IntFlag):
     DEL = BIT(0)
     EXCLUDE_OPTS = BIT(1)
     BIND_ADDR = BIT(2)
+    IFINDEX = BIT(3)
 
 
 class TCP_AUTHOPT_ALG(IntEnum):
     HMAC_SHA_1_96 = 1
     AES_128_CMAC_96 = 2
@@ -102,25 +103,31 @@  class tcp_authopt_key:
         recv_id: int = 0,
         alg=TCP_AUTHOPT_ALG.HMAC_SHA_1_96,
         key: KeyArgType = b"",
         addr: AddrArgType = None,
         auto_flags: bool = True,
+        ifindex: typing.Optional[int] = None,
         include_options=None,
     ):
         self.flags = flags
         self.send_id = send_id
         self.recv_id = recv_id
         self.alg = alg
         self.key = key
+        self.ifindex = ifindex
         self.addr = addr
         self.auto_flags = auto_flags
         if include_options is not None:
             self.include_options = include_options
 
     def get_real_flags(self) -> TCP_AUTHOPT_KEY_FLAG:
         result = self.flags
         if self.auto_flags:
+            if self.ifindex is not None:
+                result |= TCP_AUTHOPT_KEY_FLAG.IFINDEX
+            else:
+                result &= ~TCP_AUTHOPT_KEY_FLAG.IFINDEX
             if self.addr is not None:
                 result |= TCP_AUTHOPT_KEY_FLAG.BIND_ADDR
             else:
                 result &= ~TCP_AUTHOPT_KEY_FLAG.BIND_ADDR
         return result
@@ -136,10 +143,12 @@  class tcp_authopt_key:
             self.alg,
             len(self.key),
             self.key,
         )
         data += bytes(self.addrbuf.ljust(sockaddr_storage.sizeof, b"\x00"))
+        if self.ifindex is not None:
+            data += bytes(struct.pack("I", self.ifindex))
         return data
 
     def __bytes__(self):
         return self.pack()
 
diff --git a/tools/testing/selftests/tcp_authopt/tcp_authopt_test/test_vrf_bind.py b/tools/testing/selftests/tcp_authopt/tcp_authopt_test/test_vrf_bind.py
new file mode 100644
index 000000000000..da43ac8842e5
--- /dev/null
+++ b/tools/testing/selftests/tcp_authopt/tcp_authopt_test/test_vrf_bind.py
@@ -0,0 +1,492 @@ 
+# SPDX-License-Identifier: GPL-2.0
+"""Test VRF overlap behavior
+
+With tcp_l3mdev_accept single server should be able to differentiate multiple
+clients with same IP coming from different VRFs.
+"""
+import errno
+import logging
+import socket
+from contextlib import ExitStack
+
+import pytest
+
+from . import linux_tcp_md5sig
+from .conftest import parametrize_product, skipif_missing_tcp_authopt
+from .linux_tcp_authopt import (
+    set_tcp_authopt_key,
+    set_tcp_authopt_key_kwargs,
+    tcp_authopt_key,
+)
+from .server import SimpleServerThread
+from .utils import (
+    DEFAULT_TCP_SERVER_PORT,
+    check_socket_echo,
+    create_client_socket,
+    create_listen_socket,
+)
+from .vrf_netns_fixture import VrfNamespaceFixture
+
+logger = logging.getLogger(__name__)
+
+
+class VrfFixture:
+    """Fixture for VRF testing
+
+    Single server has two interfaces with same IP addr: one inside VRF and one
+    outside. Two clients two namespaces have same client IP, one connected to
+    VRF and one outside.
+    """
+
+    def __init__(
+        self,
+        address_family=socket.AF_INET,
+        tcp_l3mdev_accept=1,
+        init_default_listen_socket=True,
+    ):
+        self.address_family = address_family
+        self.tcp_l3mdev_accept = tcp_l3mdev_accept
+        self.init_default_listen_socket = init_default_listen_socket
+
+    @property
+    def server_addr(self):
+        if self.address_family == socket.AF_INET:
+            return self.nsfixture.server_ipv4_addr
+        else:
+            return self.nsfixture.server_ipv6_addr
+
+    @property
+    def client_addr(self):
+        if self.address_family == socket.AF_INET:
+            return self.nsfixture.client_ipv4_addr
+        else:
+            return self.nsfixture.client_ipv6_addr
+
+    @property
+    def server_addr_port(self):
+        return (str(self.server_addr), DEFAULT_TCP_SERVER_PORT)
+
+    @property
+    def vrf1_ifindex(self):
+        return self.nsfixture.server_vrf1_ifindex
+
+    @property
+    def vrf2_ifindex(self):
+        return self.nsfixture.server_vrf2_ifindex
+
+    def create_listen_socket(self, **kw):
+        result = create_listen_socket(
+            family=self.address_family,
+            ns=self.nsfixture.server_netns_name,
+            bind_addr=self.server_addr,
+            **kw
+        )
+        self.exit_stack.enter_context(result)
+        return result
+
+    def create_client_socket(self, ns):
+        result = create_client_socket(
+            ns=ns, family=self.address_family, bind_addr=self.client_addr
+        )
+        self.exit_stack.enter_context(result)
+        return result
+
+    def __enter__(self):
+        self.exit_stack = ExitStack()
+        self.exit_stack.__enter__()
+        self.nsfixture = self.exit_stack.enter_context(
+            VrfNamespaceFixture(tcp_l3mdev_accept=self.tcp_l3mdev_accept)
+        )
+
+        self.server_thread = SimpleServerThread(mode="echo")
+        if self.init_default_listen_socket:
+            self.listen_socket = self.create_listen_socket()
+            self.server_thread.add_listen_socket(self.listen_socket)
+        self.exit_stack.enter_context(self.server_thread)
+        return self
+
+    def __exit__(self, *args):
+        self.exit_stack.__exit__(*args)
+
+
+@pytest.mark.parametrize("address_family", [socket.AF_INET, socket.AF_INET6])
+def test_vrf_overlap_unsigned(exit_stack: ExitStack, address_family):
+    """Test without any signature support"""
+    fix = VrfFixture(address_family)
+    exit_stack.enter_context(fix)
+
+    client_socket0 = fix.create_client_socket(fix.nsfixture.client1_netns_name)
+    client_socket1 = fix.create_client_socket(fix.nsfixture.client1_netns_name)
+    client_socket2 = fix.create_client_socket(fix.nsfixture.client2_netns_name)
+
+    client_socket2.connect(fix.server_addr_port)
+    client_socket1.connect(fix.server_addr_port)
+    client_socket0.connect(fix.server_addr_port)
+    check_socket_echo(client_socket1)
+    check_socket_echo(client_socket2)
+    check_socket_echo(client_socket0)
+    check_socket_echo(client_socket1)
+    check_socket_echo(client_socket2)
+    check_socket_echo(client_socket0)
+    check_socket_echo(client_socket2)
+
+
+KEY0 = b"00000"
+KEY1 = b"1"
+KEY2 = b"22"
+
+
+def set_server_md5(fix, key=KEY0, **kw):
+    linux_tcp_md5sig.setsockopt_md5sig_kwargs(
+        fix.listen_socket, key=key, addr=fix.client_addr, **kw
+    )
+
+
+def set_server_md5_key0(fix, key=KEY0):
+    return set_server_md5(fix, key=key)
+
+
+def set_server_md5_key1(fix, key=KEY1):
+    return set_server_md5(fix, key=key, ifindex=fix.vrf1_ifindex)
+
+
+def set_server_md5_key2(fix, key=KEY2):
+    return set_server_md5(fix, key=key, ifindex=fix.vrf2_ifindex)
+
+
+def set_client_md5_key(fix, client_socket, key):
+    linux_tcp_md5sig.setsockopt_md5sig_kwargs(
+        client_socket, key=key, addr=fix.server_addr
+    )
+
+
+@pytest.mark.parametrize("address_family", [socket.AF_INET, socket.AF_INET6])
+def test_vrf_overlap_md5_samekey(exit_stack: ExitStack, address_family):
+    """Test overlapping keys that are identical"""
+    fix = VrfFixture(address_family)
+    exit_stack.enter_context(fix)
+    set_server_md5_key0(fix, b"same")
+    set_server_md5_key1(fix, b"same")
+    set_server_md5_key2(fix, b"same")
+    client_socket0 = fix.create_client_socket(fix.nsfixture.client0_netns_name)
+    client_socket1 = fix.create_client_socket(fix.nsfixture.client1_netns_name)
+    client_socket2 = fix.create_client_socket(fix.nsfixture.client2_netns_name)
+    set_client_md5_key(fix, client_socket0, b"same")
+    set_client_md5_key(fix, client_socket1, b"same")
+    set_client_md5_key(fix, client_socket2, b"same")
+    client_socket0.connect(fix.server_addr_port)
+    client_socket1.connect(fix.server_addr_port)
+    client_socket2.connect(fix.server_addr_port)
+    check_socket_echo(client_socket1)
+    check_socket_echo(client_socket2)
+    check_socket_echo(client_socket0)
+
+
+@pytest.mark.parametrize("address_family", [socket.AF_INET, socket.AF_INET6])
+def test_vrf_overlap12_md5(exit_stack: ExitStack, address_family):
+    """Test overlapping keys between vrfs"""
+    fix = VrfFixture(address_family)
+    exit_stack.enter_context(fix)
+    set_server_md5_key1(fix)
+    set_server_md5_key2(fix)
+    client_socket1 = fix.create_client_socket(fix.nsfixture.client1_netns_name)
+    client_socket2 = fix.create_client_socket(fix.nsfixture.client2_netns_name)
+    set_client_md5_key(fix, client_socket1, KEY1)
+    set_client_md5_key(fix, client_socket2, KEY2)
+    client_socket1.connect(fix.server_addr_port)
+    client_socket2.connect(fix.server_addr_port)
+    check_socket_echo(client_socket1)
+    check_socket_echo(client_socket2)
+
+
+@pytest.mark.parametrize("address_family", [socket.AF_INET, socket.AF_INET6])
+def test_vrf_overlap01_md5(exit_stack: ExitStack, address_family):
+    """Test overlapping keys inside and outside vrf, VRF key added second"""
+    fix = VrfFixture(address_family)
+    exit_stack.enter_context(fix)
+    set_server_md5_key0(fix)
+    set_server_md5_key1(fix)
+    client_socket0 = fix.create_client_socket(fix.nsfixture.client0_netns_name)
+    client_socket1 = fix.create_client_socket(fix.nsfixture.client1_netns_name)
+    set_client_md5_key(fix, client_socket0, KEY0)
+    set_client_md5_key(fix, client_socket1, KEY1)
+    client_socket1.connect(fix.server_addr_port)
+    client_socket0.connect(fix.server_addr_port)
+    check_socket_echo(client_socket0)
+    check_socket_echo(client_socket1)
+
+
+@pytest.mark.parametrize("address_family", [socket.AF_INET, socket.AF_INET6])
+def test_vrf_overlap10_md5(exit_stack: ExitStack, address_family):
+    """Test overlapping keys inside and outside vrf, VRF key added first"""
+    fix = VrfFixture(address_family)
+    exit_stack.enter_context(fix)
+    set_server_md5_key1(fix)
+    set_server_md5_key0(fix)
+    client_socket0 = fix.create_client_socket(fix.nsfixture.client0_netns_name)
+    client_socket1 = fix.create_client_socket(fix.nsfixture.client1_netns_name)
+    set_client_md5_key(fix, client_socket0, KEY0)
+    set_client_md5_key(fix, client_socket1, KEY1)
+    client_socket1.connect(fix.server_addr_port)
+    client_socket0.connect(fix.server_addr_port)
+    check_socket_echo(client_socket0)
+    check_socket_echo(client_socket1)
+
+
+@pytest.mark.parametrize("address_family", [socket.AF_INET])
+def test_vrf_overlap_md5_prefix(exit_stack: ExitStack, address_family):
+    """VRF keys should take precedence even if prefixlen is low"""
+    fix = VrfFixture(address_family)
+    exit_stack.enter_context(fix)
+    set_server_md5(fix, key=b"fail", prefixlen=16)
+    set_server_md5(
+        fix, key=b"pass", ifindex=fix.nsfixture.server_vrf1_ifindex, prefixlen=1
+    )
+    set_server_md5(fix, key=b"fail", prefixlen=24)
+
+    # connect via VRF
+    client_socket = fix.create_client_socket(fix.nsfixture.client1_netns_name)
+    set_client_md5_key(fix, client_socket, b"pass")
+    client_socket.connect(fix.server_addr_port)
+
+
+class TestVRFOverlapAOBoundKeyPrecedence:
+    """Keys bound to VRF should take precedence over unbound keys.
+
+    KEY0 is unbound (accepts all vrfs)
+    KEY1 is bound to vrf1
+    """
+
+    fix: VrfFixture
+
+    @pytest.fixture(
+        autouse=True,
+        scope="class",
+        params=[socket.AF_INET, socket.AF_INET6],
+    )
+    def init(self, request: pytest.FixtureRequest):
+        address_family = request.param
+        logger.info("init address_family=%s", address_family)
+        with ExitStack() as exit_stack:
+            fix = exit_stack.enter_context(VrfFixture(address_family))
+            set_tcp_authopt_key_kwargs(
+                fix.listen_socket,
+                key=KEY0,
+                ifindex=None,
+            )
+            set_tcp_authopt_key_kwargs(
+                fix.listen_socket,
+                key=KEY1,
+                ifindex=fix.vrf1_ifindex,
+            )
+            self.__class__.fix = fix
+            yield
+        logger.info("done address_family=%s", address_family)
+
+    def test_vrf1_key0(self):
+        client_socket = self.fix.create_client_socket(
+            self.fix.nsfixture.client1_netns_name
+        )
+        set_tcp_authopt_key_kwargs(client_socket, key=KEY0)
+        with pytest.raises(socket.timeout):
+            client_socket.connect(self.fix.server_addr_port)
+
+    def test_vrf1_key1(self):
+        client_socket = self.fix.create_client_socket(
+            self.fix.nsfixture.client1_netns_name
+        )
+        set_tcp_authopt_key_kwargs(client_socket, key=KEY1)
+        client_socket.connect(self.fix.server_addr_port)
+
+    def test_vrf2_key0(self):
+        client_socket = self.fix.create_client_socket(
+            self.fix.nsfixture.client2_netns_name
+        )
+        set_tcp_authopt_key_kwargs(client_socket, key=KEY0)
+        client_socket.connect(self.fix.server_addr_port)
+
+    def test_vrf2_key1(self):
+        client_socket = self.fix.create_client_socket(
+            self.fix.nsfixture.client2_netns_name
+        )
+        set_tcp_authopt_key_kwargs(client_socket, key=KEY1)
+        with pytest.raises(socket.timeout):
+            client_socket.connect(self.fix.server_addr_port)
+
+
+def assert_raises_enoent(func):
+    with pytest.raises(OSError) as e:
+        func()
+    assert e.value.errno == errno.ENOENT
+
+
+def test_vrf_overlap_md5_del_0110():
+    """Removing keys should not raise ENOENT because they are distinct"""
+    with VrfFixture() as fix:
+        set_server_md5(fix, key=KEY0)
+        set_server_md5(fix, key=KEY1, ifindex=fix.vrf1_ifindex)
+        set_server_md5(fix, key=b"", ifindex=fix.vrf1_ifindex)
+        set_server_md5(fix, key=b"")
+        assert_raises_enoent(lambda: set_server_md5(fix, key=b""))
+
+
+def test_vrf_overlap_md5_del_1001():
+    """Removing keys should not raise ENOENT because they are distinct"""
+    with VrfFixture() as fix:
+        set_server_md5(fix, key=KEY1, ifindex=fix.vrf1_ifindex)
+        set_server_md5(fix, key=KEY0)
+        set_server_md5(fix, key=b"")
+        set_server_md5(fix, key=b"", ifindex=fix.vrf1_ifindex)
+        assert_raises_enoent(lambda: set_server_md5(fix, key=b""))
+
+
+def test_vrf_overlap_md5_del_1010():
+    """Removing keys should not raise ENOENT because they are distinct"""
+    with VrfFixture() as fix:
+        set_server_md5(fix, key=KEY1, ifindex=fix.vrf1_ifindex)
+        set_server_md5(fix, key=KEY0)
+        set_server_md5(fix, key=b"", ifindex=fix.vrf1_ifindex)
+        set_server_md5(fix, key=b"")
+        assert_raises_enoent(lambda: set_server_md5(fix, key=b""))
+
+
+@skipif_missing_tcp_authopt
+@pytest.mark.parametrize("address_family", [socket.AF_INET, socket.AF_INET6])
+def test_vrf_overlap_ao_samekey(exit_stack: ExitStack, address_family):
+    """Single server serving both VRF and non-VRF client with same password.
+
+    This requires no special support from TCP-AO.
+    """
+    fix = VrfFixture(address_family)
+    exit_stack.enter_context(fix)
+    set_tcp_authopt_key(fix.listen_socket, tcp_authopt_key(key="11111"))
+
+    client_socket1 = fix.create_client_socket(fix.nsfixture.client1_netns_name)
+    client_socket2 = fix.create_client_socket(fix.nsfixture.client2_netns_name)
+
+    set_tcp_authopt_key(client_socket1, tcp_authopt_key(key="11111"))
+    set_tcp_authopt_key(client_socket2, tcp_authopt_key(key="11111"))
+    client_socket1.connect(fix.server_addr_port)
+    client_socket2.connect(fix.server_addr_port)
+    check_socket_echo(client_socket1)
+    check_socket_echo(client_socket2)
+    check_socket_echo(client_socket1)
+    check_socket_echo(client_socket2)
+
+
+@skipif_missing_tcp_authopt
+@pytest.mark.parametrize("address_family", [socket.AF_INET, socket.AF_INET6])
+def test_vrf_overlap_ao(exit_stack: ExitStack, address_family):
+    """Single server serving both VRF and non-VRF client with different passwords
+
+    This requires kernel to handle ifindex
+    """
+    fix = VrfFixture(address_family)
+    exit_stack.enter_context(fix)
+    set_tcp_authopt_key(
+        fix.listen_socket,
+        tcp_authopt_key(key=KEY0, ifindex=0),
+    )
+    set_tcp_authopt_key(
+        fix.listen_socket,
+        tcp_authopt_key(key=KEY1, ifindex=fix.vrf1_ifindex),
+    )
+    set_tcp_authopt_key(
+        fix.listen_socket,
+        tcp_authopt_key(key=KEY2, ifindex=fix.vrf2_ifindex),
+    )
+
+    client_socket0 = fix.create_client_socket(fix.nsfixture.client0_netns_name)
+    client_socket1 = fix.create_client_socket(fix.nsfixture.client1_netns_name)
+    client_socket2 = fix.create_client_socket(fix.nsfixture.client2_netns_name)
+    set_tcp_authopt_key(client_socket0, tcp_authopt_key(key=KEY0))
+    set_tcp_authopt_key(client_socket1, tcp_authopt_key(key=KEY1))
+    set_tcp_authopt_key(client_socket2, tcp_authopt_key(key=KEY2))
+    client_socket0.connect(fix.server_addr_port)
+    client_socket1.connect(fix.server_addr_port)
+    client_socket2.connect(fix.server_addr_port)
+    check_socket_echo(client_socket0)
+    check_socket_echo(client_socket1)
+    check_socket_echo(client_socket2)
+    check_socket_echo(client_socket0)
+    check_socket_echo(client_socket1)
+    check_socket_echo(client_socket2)
+
+
+@parametrize_product(
+    address_family=(socket.AF_INET, socket.AF_INET6),
+    tcp_l3mdev_accept=(0, 1),
+    bind_key_to_vrf=(0, 1),
+)
+def test_md5_pervrf(
+    exit_stack: ExitStack, address_family, tcp_l3mdev_accept, bind_key_to_vrf
+):
+    """Test one VRF-bound socket.
+
+    Since the socket is already bound to the vrf binding the key should not be required.
+    """
+    fix = VrfFixture(
+        address_family,
+        tcp_l3mdev_accept=tcp_l3mdev_accept,
+        init_default_listen_socket=False,
+    )
+    exit_stack.enter_context(fix)
+    listen_socket1 = fix.create_listen_socket(bind_device="veth1")
+    linux_tcp_md5sig.setsockopt_md5sig_kwargs(
+        listen_socket1,
+        key=KEY1,
+        addr=fix.client_addr,
+        ifindex=fix.vrf1_ifindex if bind_key_to_vrf else None,
+    )
+    fix.server_thread.add_listen_socket(listen_socket1)
+    client_socket1 = fix.create_client_socket(fix.nsfixture.client1_netns_name)
+    set_client_md5_key(fix, client_socket1, KEY1)
+    client_socket1.connect(fix.server_addr_port)
+    check_socket_echo(client_socket1)
+
+
+@pytest.mark.parametrize(
+    "address_family",
+    (socket.AF_INET, socket.AF_INET6),
+)
+def test_vrf_overlap_md5_pervrf(exit_stack: ExitStack, address_family):
+    """Test overlapping via per-VRF sockets"""
+    fix = VrfFixture(
+        address_family,
+        tcp_l3mdev_accept=0,
+        init_default_listen_socket=False,
+    )
+    exit_stack.enter_context(fix)
+    listen_socket0 = fix.create_listen_socket()
+    listen_socket1 = fix.create_listen_socket(bind_device="veth1")
+    listen_socket2 = fix.create_listen_socket(bind_device="veth2")
+    linux_tcp_md5sig.setsockopt_md5sig_kwargs(
+        listen_socket0,
+        key=KEY0,
+        addr=fix.client_addr,
+    )
+    linux_tcp_md5sig.setsockopt_md5sig_kwargs(
+        listen_socket1,
+        key=KEY1,
+        addr=fix.client_addr,
+    )
+    linux_tcp_md5sig.setsockopt_md5sig_kwargs(
+        listen_socket2,
+        key=KEY2,
+        addr=fix.client_addr,
+    )
+    fix.server_thread.add_listen_socket(listen_socket0)
+    fix.server_thread.add_listen_socket(listen_socket1)
+    fix.server_thread.add_listen_socket(listen_socket2)
+    client_socket0 = fix.create_client_socket(fix.nsfixture.client0_netns_name)
+    client_socket1 = fix.create_client_socket(fix.nsfixture.client1_netns_name)
+    client_socket2 = fix.create_client_socket(fix.nsfixture.client2_netns_name)
+    set_client_md5_key(fix, client_socket0, KEY0)
+    set_client_md5_key(fix, client_socket1, KEY1)
+    set_client_md5_key(fix, client_socket2, KEY2)
+    client_socket0.connect(fix.server_addr_port)
+    client_socket1.connect(fix.server_addr_port)
+    client_socket2.connect(fix.server_addr_port)
+    check_socket_echo(client_socket1)
+    check_socket_echo(client_socket1)
+    check_socket_echo(client_socket0)
diff --git a/tools/testing/selftests/tcp_authopt/tcp_authopt_test/vrf_netns_fixture.py b/tools/testing/selftests/tcp_authopt/tcp_authopt_test/vrf_netns_fixture.py
new file mode 100644
index 000000000000..ff9c0959a268
--- /dev/null
+++ b/tools/testing/selftests/tcp_authopt/tcp_authopt_test/vrf_netns_fixture.py
@@ -0,0 +1,127 @@ 
+# SPDX-License-Identifier: GPL-2.0
+import subprocess
+from ipaddress import IPv4Address, IPv6Address
+
+
+def ip_link_get_ifindex(dev: str, prefix: str = "") -> int:
+    out = subprocess.check_output(
+        f"{prefix}ip -o link show {dev}", text=True, shell=True
+    )
+    return int(out.split(":", 1)[0])
+
+
+def get_ipv4_addr(ns=1, index=1) -> IPv4Address:
+    return IPv4Address("10.10.0.0") + (ns << 8) + index
+
+
+def get_ipv6_addr(ns=1, index=1) -> IPv6Address:
+    return IPv6Address("fd00::") + (ns << 16) + index
+
+
+class VrfNamespaceFixture:
+    """Namespace fixture for VRF testing.
+
+    Single server has two interfaces with same IP addr: one inside VRF and one
+    outside.
+
+    Two clients two namespaces have same client IP, one connected to VRF and one
+    outside.
+    """
+
+    tcp_l3mdev_accept = 1
+
+    server_netns_name = "tcp_authopt_test_server"
+    client0_netns_name = "tcp_authopt_test_client0"
+    client1_netns_name = "tcp_authopt_test_client1"
+    client2_netns_name = "tcp_authopt_test_client2"
+
+    # 02:* means "locally administered"
+    server_veth0_mac_addr = "02:00:00:01:00:00"
+    server_veth1_mac_addr = "02:00:00:01:00:01"
+    server_veth2_mac_addr = "02:00:00:01:00:02"
+    client0_mac_addr = "02:00:00:02:00:00"
+    client1_mac_addr = "02:00:00:02:01:00"
+    client2_mac_addr = "02:00:00:02:02:00"
+
+    server_ipv4_addr = get_ipv4_addr(1, 1)
+    server_ipv6_addr = get_ipv6_addr(1, 1)
+    client_ipv4_addr = get_ipv4_addr(2, 1)
+    client_ipv6_addr = get_ipv6_addr(2, 1)
+
+    def __init__(self, **kw):
+        import os
+
+        import pytest
+
+        from .conftest import raise_skip_no_netns
+
+        raise_skip_no_netns()
+        if not os.path.exists("/proc/sys/net/ipv4/tcp_l3mdev_accept"):
+            pytest.skip(
+                "missing tcp_l3mdev_accept, is CONFIG_NET_L3_MASTER_DEV enabled?)"
+            )
+        for k, v in kw.items():
+            setattr(self, k, v)
+
+    def get_server_ifindex(self, dev):
+        return ip_link_get_ifindex(dev, f"ip netns exec {self.server_netns_name} ")
+
+    def __enter__(self):
+        self._del_netns()
+        script = f"""
+set -e
+ip netns add {self.server_netns_name}
+ip netns add {self.client0_netns_name}
+ip netns add {self.client1_netns_name}
+ip netns add {self.client2_netns_name}
+# Enable tcp_l3mdev unconditionally
+ip netns exec {self.server_netns_name} sysctl -q net.ipv4.tcp_l3mdev_accept={int(self.tcp_l3mdev_accept)}
+ip link add veth0 netns {self.server_netns_name} type veth peer name veth0 netns {self.client0_netns_name}
+ip link add veth1 netns {self.server_netns_name} type veth peer name veth0 netns {self.client1_netns_name}
+ip link add veth2 netns {self.server_netns_name} type veth peer name veth0 netns {self.client2_netns_name}
+ip link add vrf1 netns {self.server_netns_name} type vrf table 1000
+ip link add vrf2 netns {self.server_netns_name} type vrf table 2000
+ip -n {self.server_netns_name} link set vrf1 up
+ip -n {self.server_netns_name} link set vrf2 up
+ip -n {self.server_netns_name} link set veth1 vrf vrf1
+ip -n {self.server_netns_name} link set veth2 vrf vrf2
+ip -n {self.server_netns_name} link set veth0 up addr {self.server_veth0_mac_addr}
+ip -n {self.server_netns_name} link set veth1 up addr {self.server_veth1_mac_addr}
+ip -n {self.server_netns_name} link set veth2 up addr {self.server_veth2_mac_addr}
+ip -n {self.server_netns_name} addr add {self.server_ipv4_addr}/16 dev veth0
+ip -n {self.server_netns_name} addr add {self.server_ipv6_addr}/64 dev veth0 nodad
+ip -n {self.server_netns_name} addr add {self.server_ipv4_addr}/16 dev veth1
+ip -n {self.server_netns_name} addr add {self.server_ipv6_addr}/64 dev veth1 nodad
+ip -n {self.server_netns_name} addr add {self.server_ipv4_addr}/16 dev veth2
+ip -n {self.server_netns_name} addr add {self.server_ipv6_addr}/64 dev veth2 nodad
+ip -n {self.client0_netns_name} link set veth0 up addr {self.client0_mac_addr}
+ip -n {self.client0_netns_name} addr add {self.client_ipv4_addr}/16 dev veth0
+ip -n {self.client0_netns_name} addr add {self.client_ipv6_addr}/64 dev veth0 nodad
+ip -n {self.client1_netns_name} link set veth0 up addr {self.client1_mac_addr}
+ip -n {self.client1_netns_name} addr add {self.client_ipv4_addr}/16 dev veth0
+ip -n {self.client1_netns_name} addr add {self.client_ipv6_addr}/64 dev veth0 nodad
+ip -n {self.client2_netns_name} link set veth0 up addr {self.client2_mac_addr}
+ip -n {self.client2_netns_name} addr add {self.client_ipv4_addr}/16 dev veth0
+ip -n {self.client2_netns_name} addr add {self.client_ipv6_addr}/64 dev veth0 nodad
+"""
+        subprocess.run(script, shell=True, check=True)
+        self.server_veth0_ifindex = self.get_server_ifindex("veth0")
+        self.server_veth1_ifindex = self.get_server_ifindex("veth1")
+        self.server_veth2_ifindex = self.get_server_ifindex("veth2")
+        self.server_vrf1_ifindex = self.get_server_ifindex("vrf1")
+        self.server_vrf2_ifindex = self.get_server_ifindex("vrf2")
+        return self
+
+    def _del_netns(self):
+        script = f"""\
+set -e
+for ns in {self.server_netns_name} {self.client0_netns_name} {self.client1_netns_name} {self.client2_netns_name}; do
+    if ip netns list | grep -q "$ns"; then
+        ip netns del "$ns"
+    fi
+done
+"""
+        subprocess.run(script, shell=True, check=True)
+
+    def __exit__(self, *a):
+        self._del_netns()