@@ -59,7 +59,8 @@ class TestResult:
return self.steps
class TestSuiteReport():
- _testsuite = []
+ def __init__(self):
+ self._testsuite = []
def add_resultdata(self, result_data):
if isinstance(result_data, TestResult):
@@ -3,46 +3,65 @@ import signal
from string import Template
import subprocess
import time
+from multiprocessing import Pool
from functools import cached_property
from TdcPlugin import TdcPlugin
from tdc_config import *
+def prepare_suite(obj, test):
+ original = obj.args.NAMES
+
+ if 'skip' in test and test['skip'] == 'yes':
+ return
+
+ if 'nsPlugin' not in test['plugins']:
+ return
+
+ shadow = {}
+ shadow['IP'] = original['IP']
+ shadow['TC'] = original['TC']
+ shadow['NS'] = '{}-{}'.format(original['NS'], test['random'])
+ shadow['DEV0'] = '{}id{}'.format(original['DEV0'], test['id'])
+ shadow['DEV1'] = '{}id{}'.format(original['DEV1'], test['id'])
+ shadow['DUMMY'] = '{}id{}'.format(original['DUMMY'], test['id'])
+ shadow['DEV2'] = original['DEV2']
+ obj.args.NAMES = shadow
+
+ if obj.args.namespace:
+ obj._ns_create()
+ else:
+ obj._ports_create()
+
+ # Make sure the netns is visible in the fs
+ while True:
+ obj._proc_check()
+ try:
+ ns = obj.args.NAMES['NS']
+ f = open('/run/netns/{}'.format(ns))
+ f.close()
+ break
+ except:
+ time.sleep(0.1)
+ continue
+
+ obj.args.NAMES = original
+
class SubPlugin(TdcPlugin):
def __init__(self):
self.sub_class = 'ns/SubPlugin'
super().__init__()
def pre_suite(self, testcount, testlist):
+ from itertools import cycle
+
super().pre_suite(testcount, testlist)
print("Setting up namespaces and devices...")
- original = self.args.NAMES
-
- for t in testlist:
- if 'skip' in t and t['skip'] == 'yes':
- continue
-
- if 'nsPlugin' not in t['plugins']:
- continue
-
- shadow = {}
- shadow['IP'] = original['IP']
- shadow['TC'] = original['TC']
- shadow['NS'] = '{}-{}'.format(original['NS'], t['random'])
- shadow['DEV0'] = '{}id{}'.format(original['DEV0'], t['id'])
- shadow['DEV1'] = '{}id{}'.format(original['DEV1'], t['id'])
- shadow['DUMMY'] = '{}id{}'.format(original['DUMMY'], t['id'])
- shadow['DEV2'] = original['DEV2']
- self.args.NAMES = shadow
-
- if self.args.namespace:
- self._ns_create()
- else:
- self._ports_create()
-
- self.args.NAMES = original
+ with Pool(self.args.mp) as p:
+ it = zip(cycle([self]), testlist)
+ p.starmap(prepare_suite, it)
def pre_case(self, caseinfo, test_skip):
if self.args.verbose:
@@ -51,16 +70,6 @@ class SubPlugin(TdcPlugin):
if test_skip:
return
- # Make sure the netns is visible in the fs
- while True:
- self._proc_check()
- try:
- ns = self.args.NAMES['NS']
- f = open('/run/netns/{}'.format(ns))
- f.close()
- break
- except:
- continue
def post_case(self):
if self.args.verbose:
@@ -17,6 +17,7 @@ import subprocess
import time
import traceback
import random
+from multiprocessing import Pool
from collections import OrderedDict
from string import Template
@@ -477,26 +478,11 @@ def run_one_test(pm, args, index, tidx):
return res
-def test_runner(pm, args, filtered_tests):
- """
- Driver function for the unit tests.
-
- Prints information about the tests being run, executes the setup and
- teardown commands and the command under test itself. Also determines
- success/failure based on the information in the test case and generates
- TAP output accordingly.
- """
- testlist = filtered_tests
+def prepare_run(pm, args, testlist):
tcount = len(testlist)
- index = 1
- tap = ''
- badtest = None
- stage = None
emergency_exit = False
emergency_exit_message = ''
- tsr = TestSuiteReport()
-
try:
pm.call_pre_suite(tcount, testlist)
except Exception as ee:
@@ -506,14 +492,37 @@ def test_runner(pm, args, filtered_tests):
traceback.print_tb(ex_tb)
emergency_exit_message = 'EMERGENCY EXIT, call_pre_suite failed with exception {} {}\n'.format(ex_type, ex)
emergency_exit = True
- stage = 'pre-SUITE'
if emergency_exit:
- pm.call_post_suite(index)
+ pm.call_post_suite(1)
return emergency_exit_message
- if args.verbose > 1:
+
+ if args.verbose:
print('give test rig 2 seconds to stabilize')
+
time.sleep(2)
+
+def purge_run(pm, index):
+ pm.call_post_suite(index)
+
+def test_runner(pm, args, filtered_tests):
+ """
+ Driver function for the unit tests.
+
+ Prints information about the tests being run, executes the setup and
+ teardown commands and the command under test itself. Also determines
+ success/failure based on the information in the test case and generates
+ TAP output accordingly.
+ """
+ testlist = filtered_tests
+ tcount = len(testlist)
+ index = 1
+ tap = ''
+ badtest = None
+ stage = None
+
+ tsr = TestSuiteReport()
+
for tidx in testlist:
if "flower" in tidx["category"] and args.device == None:
errmsg = "Tests using the DEV2 variable must define the name of a "
@@ -576,7 +585,68 @@ def test_runner(pm, args, filtered_tests):
if input(sys.stdin):
print('got something on stdin')
- pm.call_post_suite(index)
+ return (index, tsr)
+
+def mp_bins(alltests):
+ serial = []
+ parallel = []
+
+ for test in alltests:
+ if 'nsPlugin' not in test['plugins']:
+ serial.append(test)
+ else:
+ # We can only create one netdevsim device at a time
+ if 'netdevsim/new_device' in str(test['setup']):
+ serial.append(test)
+ else:
+ parallel.append(test)
+
+ return (serial, parallel)
+
+def __mp_runner(tests):
+ (_, tsr) = test_runner(mp_pm, mp_args, tests)
+ return tsr._testsuite
+
+def test_runner_mp(pm, args, alltests):
+ prepare_run(pm, args, alltests)
+
+ (serial, parallel) = mp_bins(alltests)
+
+ batches = [parallel[n : n + 32] for n in range(0, len(parallel), 32)]
+ batches.insert(0, serial)
+
+ print("Executing {} tests in parallel and {} in serial".format(len(parallel), len(serial)))
+ print("Using {} batches".format(len(batches)))
+
+ # We can't pickle these objects so workaround them
+ global mp_pm
+ mp_pm = pm
+
+ global mp_args
+ mp_args = args
+
+ with Pool(args.mp) as p:
+ pres = p.map(__mp_runner, batches)
+
+ tsr = TestSuiteReport()
+ for trs in pres:
+ for res in trs:
+ tsr.add_resultdata(res)
+
+ # Passing an index is not useful in MP
+ purge_run(pm, None)
+
+ return tsr
+
+def test_runner_serial(pm, args, alltests):
+ prepare_run(pm, args, alltests)
+
+ if args.verbose:
+ print("Executing {} tests in serial".format(len(alltests)))
+
+ (index, tsr) = test_runner(pm, args, alltests)
+
+ purge_run(pm, index)
return tsr
@@ -605,12 +675,15 @@ def load_from_file(filename):
k['filename'] = filename
return testlist
+def identity(string):
+ return string
def args_parse():
"""
Create the argument parser.
"""
parser = argparse.ArgumentParser(description='Linux TC unit tests')
+ parser.register('type', None, identity)
return parser
@@ -668,6 +741,9 @@ def set_args(parser):
parser.add_argument(
'-P', '--pause', action='store_true',
help='Pause execution just before post-suite stage')
+ parser.add_argument(
+ '-J', '--multiprocess', type=int, default=1, dest='mp',
+ help='Run tests in parallel whenever possible')
return parser
@@ -888,7 +964,12 @@ def set_operation_mode(pm, parser, args, remaining):
except PluginDependencyException as pde:
print('The following plugins were not found:')
print('{}'.format(pde.missing_pg))
- catresults = test_runner(pm, args, alltests)
+
+ if args.mp > 1:
+ catresults = test_runner_mp(pm, args, alltests)
+ else:
+ catresults = test_runner_serial(pm, args, alltests)
+
if catresults.count_failures() != 0:
exit_code = 1 # KSFT_FAIL
if args.format == 'none':