@@ -107,6 +107,99 @@ class test(common_test.base_test):
logging.warning('Please verify %s for more info', dir)
+class subtest_loader(test):
+ """
+ Class specialized in loading and running subtests.
+
+ It is used for more complex tests, that encompass a large amount of
+ subtests, such as the KVM tests. If you plan on using this, create
+ a subdir 'tests' inside your test dir, and for each test you want to
+ create, make sure the .py file is called [test_name].py and this
+ file implements at least a function called run_[test_name].
+
+ The function run_once is already implemented, and other tests might want
+ to implement the methods:
+
+ environment_setup()
+ preprocess()
+ postprocess_on_error()
+ postprocess()
+ on_error()
+ """
+ def environment_setup(self):
+ pass
+
+
+ def preprocess(self):
+ pass
+
+
+ def postprocess_on_error(self):
+ pass
+
+
+ def postprocess(self):
+ pass
+
+
+ def on_error(self):
+ pass
+
+
+ def run_once(self, params):
+ self.params = params
+ # Report the parameters we've received and write them as keyvals
+ logging.debug("Test parameters:")
+ keys = self.params.keys()
+ keys.sort()
+ for key in keys:
+ logging.debug(" %s = %s", key, self.params[key])
+ self.write_test_keyval({key: self.params[key]})
+
+ self.environment_setup()
+ test_passed = False
+
+ try:
+ try:
+ try:
+ # Get the test routine corresponding to the specified
+ # test type
+ t_type = params.get("type")
+ # Verify if we have the correspondent source file for it
+ subtest_dir = os.path.join(self.bindir, "tests")
+ module_path = os.path.join(subtest_dir, "%s.py" % t_type)
+ if not os.path.isfile(module_path):
+ raise error.TestError("No %s.py test file found" %
+ t_type)
+ # Load the test module
+ f, p, d = imp.find_module(t_type, [subtest_dir])
+ test_module = imp.load_module(t_type, f, p, d)
+ f.close()
+
+ self.preprocess()
+
+ # Run the test function
+ run_func = getattr(test_module, "run_%s" % t_type)
+ try:
+ run_func(self, self.params, env)
+ finally:
+ kvm_utils.dump_env(env, env_filename)
+ test_passed = True
+
+ except Exception, e:
+ logging.error("Test failed: %s: %s",
+ e.__class__.__name__, e)
+
+ self.postprocess_on_error()
+
+ finally:
+ self.postprocess()
+
+ except Exception, e:
+ self.on_error()
+
+
+
def runtest(job, url, tag, args, dargs):
common_test.runtest(job, url, tag, args, dargs, locals(), globals(),
job.sysinfo.log_before_each_test,
@@ -4,7 +4,7 @@ from autotest_lib.client.common_lib import error
import kvm_utils, kvm_preprocessing
-class kvm(test.test):
+class kvm(test.subtest_loader):
"""
Suite of KVM virtualization functional tests.
Contains tests for testing both KVM kernel code and userspace code.
@@ -23,15 +23,7 @@ class kvm(test.test):
version = 1
env_version = 0
- def run_once(self, params):
- # Report the parameters we've received and write them as keyvals
- logging.debug("Test parameters:")
- keys = params.keys()
- keys.sort()
- for key in keys:
- logging.debug(" %s = %s", key, params[key])
- self.write_test_keyval({key: params[key]})
-
+ def environment_setup(self):
# Set the log file dir for the logging mechanism used by kvm_subprocess
# (this must be done before unpickling env)
kvm_utils.set_log_file_dir(self.debugdir)
@@ -39,78 +31,54 @@ class kvm(test.test):
# Open the environment file
logging.info("Unpickling env. You may see some harmless error "
"messages.")
- env_filename = os.path.join(self.bindir, params.get("env", "env"))
- env = kvm_utils.load_env(env_filename, self.env_version)
- logging.debug("Contents of environment: %s", env)
+ self.env_filename = os.path.join(self.bindir, self.params.get("env",
+ "env"))
+ self.env = kvm_utils.load_env(env_filename, self.env_version)
+ logging.debug("Contents of environment: %s", self.env)
- test_passed = False
+ def preprocess(self):
+ # Preprocess
try:
- try:
- try:
- # Get the test routine corresponding to the specified
- # test type
- t_type = params.get("type")
- # Verify if we have the correspondent source file for it
- subtest_dir = os.path.join(self.bindir, "tests")
- module_path = os.path.join(subtest_dir, "%s.py" % t_type)
- if not os.path.isfile(module_path):
- raise error.TestError("No %s.py test file found" %
- t_type)
- # Load the test module
- f, p, d = imp.find_module(t_type, [subtest_dir])
- test_module = imp.load_module(t_type, f, p, d)
- f.close()
+ kvm_preprocessing.preprocess(self, self.params, self.env)
+ finally:
+ kvm_utils.dump_env(self.env, self.env_filename)
- # Preprocess
- try:
- kvm_preprocessing.preprocess(self, params, env)
- finally:
- kvm_utils.dump_env(env, env_filename)
- # Run the test function
- run_func = getattr(test_module, "run_%s" % t_type)
- try:
- run_func(self, params, env)
- finally:
- kvm_utils.dump_env(env, env_filename)
- test_passed = True
- except Exception, e:
- logging.error("Test failed: %s: %s",
- e.__class__.__name__, e)
- try:
- kvm_preprocessing.postprocess_on_error(
- self, params, env)
- finally:
- kvm_utils.dump_env(env, env_filename)
- raise
+ def postprocess_on_error(self):
+ try:
+ kvm_preprocessing.postprocess_on_error(self, self.params, self.env)
+ finally:
+ kvm_utils.dump_env(self.env, self.env_filename)
+ raise
- finally:
- # Postprocess
- try:
- try:
- kvm_preprocessing.postprocess(self, params, env)
- except Exception, e:
- if test_passed:
- raise
- logging.error("Exception raised during "
- "postprocessing: %s", e)
- finally:
- kvm_utils.dump_env(env, env_filename)
- logging.debug("Contents of environment: %s", env)
+ def postprocess(self):
+ # Postprocess
+ try:
+ kvm_preprocessing.postprocess(self, self.params, self.env)
except Exception, e:
- if params.get("abort_on_error") != "yes":
+ if test_passed:
raise
- # Abort on error
- logging.info("Aborting job (%s)", e)
- for vm in kvm_utils.env_get_all_vms(env):
- if vm.is_dead():
- continue
- logging.info("VM '%s' is alive.", vm.name)
- for m in vm.monitors:
- logging.info("'%s' has a %s monitor unix socket at: %s",
- vm.name, m.protocol, m.filename)
- logging.info("The command line used to start '%s' was:\n%s",
- vm.name, vm.make_qemu_command())
- raise error.JobError("Abort requested (%s)" % e)
+ logging.error("Exception raised during postprocessing: %s",
+ e)
+ finally:
+ kvm_utils.dump_env(self.env, self.env_filename)
+ logging.debug("Contents of environment: %s", self.env)
+
+
+ def on_error(self):
+ if self.params.get("abort_on_error", "no") != "yes":
+ raise
+ # Abort on error
+ logging.info("Aborting job (%s)", e)
+ for vm in kvm_utils.env_get_all_vms(self.env):
+ if vm.is_dead():
+ continue
+ logging.info("VM '%s' is alive.", vm.name)
+ for m in vm.monitors:
+ logging.info("'%s' has a %s monitor unix socket at: %s",
+ vm.name, m.protocol, m.filename)
+ logging.info("The command line used to start '%s' was:\n%s",
+ vm.name, vm.make_qemu_command())
+ raise error.JobError("Abort requested (%s)" % e)