diff mbox

[AUTOTEST,3/3] autotest: Client/server part unification.

Message ID 1314342724-28309-4-git-send-email-jzupka@redhat.com (mailing list archive)
State New, archived
Headers show

Commit Message

Jiri Zupka Aug. 26, 2011, 7:12 a.m. UTC
Add ability to start autotest like server from clinet part of autotest on others
systems over network.

This patch add ability to start autotest tests on other system over network
same way like server_job do this in server part of autotest. This patch remove necesary
to write some tests multiple in diferent environment (virt tests, client part tests, etc).

Usage:
   class subtest(test.test):
    version = 1

    def run_once(self, test_name, test_args):
        self.job.extend_to_server_job()
        guest = hosts.create_host("192.168.122.130")
        guest2 = hosts.create_host("192.168.122.88")
        at = autotest.Autotest(guest)
        at2 = autotest.Autotest(guest2)

        template = ''.join(["job.run_test('sleeptest', tag='%s', ",
                            "iterations=%d)"])
        guest_control = template % ("test", 10)
        guest_control2 = template % ("test", 1)

        def a1():
            at2.run(guest_control, guest2.hostname, background=False, tag="one")

        def a2():
            at.run(guest_control2, guest.hostname, background=True,tag="two")

    1) For start two independent test
         t = virt_utils.Thread(self.job.parallel,[[a2]])
         t2 = virt_utils.Thread(self.job.parallel,[[a1]])

    2) For start two parallel tests with out waiting for ending of this tests.
         t = virt_utils.Thread(self.job.parallel,[[a2], [a1]])

    3) For start two parallel test and wait for end of this tests.
         self.job.parallel([a2], [a1])

There is a little problem with keep compatibility with standard client
part job and server part. There is problem with logs indention.
This problem solves using self.job.parallel for start autotest.

There is also problem with start autotest on same system where is
already started another client, because client/job uses agressive
way how to clean directory and erase data for all jobs.
This problem can be easily resolve.

Signed-off-by: Ji?í Župka <jzupka@redhat.com>
---
 client/bin/job.py                            |  143 +++++++++++++++++++++++++-
 client/bin/profilers.py                      |    5 +
 client/common_lib/autotest.py                |   42 ++++----
 client/common_lib/base_hosts/__init__.py     |   10 ++-
 client/common_lib/base_hosts/base_classes.py |   24 +++--
 client/common_lib/base_job.py                |  117 +++++++++++++++++++++-
 client/common_lib/hosts/monitors/console.py  |    2 +-
 server/autotest_unittest.py                  |    2 +
 server/server_job.py                         |   75 --------------
 server/tests/netperf2/netperf2.py            |    2 +
 10 files changed, 314 insertions(+), 108 deletions(-)
diff mbox

Patch

diff --git a/client/bin/job.py b/client/bin/job.py
index 1abdbcd..97bc32b 100644
--- a/client/bin/job.py
+++ b/client/bin/job.py
@@ -8,6 +8,7 @@  Copyright Andy Whitcroft, Martin J. Bligh 2006
 import copy, os, platform, re, shutil, sys, time, traceback, types, glob
 import logging, getpass, errno, weakref
 import cPickle as pickle
+import tempfile, fcntl
 from autotest_lib.client.bin import client_logging_config
 from autotest_lib.client.bin import utils, parallel, kernel, xen
 from autotest_lib.client.bin import profilers, boottool, harness
@@ -71,6 +72,17 @@  class status_indenter(base_job.status_indenter):
         self.job._record_indent -= 1
 
 
+    def get_context(self):
+        """Returns a context object for use by job.get_record_context."""
+        class context(object):
+            def __init__(self, indenter, indent):
+                self._indenter = indenter
+                self._indent = indent
+            def restore(self):
+                self._indenter._indent = self._indent
+        return context(self, self.job._record_indent)
+
+
 class base_client_job(base_job.base_job):
     """The client-side concrete implementation of base_job.
 
@@ -120,6 +132,56 @@  class base_client_job(base_job.base_job):
             raise
 
 
+    def use_external_logging(self):
+        """
+        Return True if external logging should be used.
+        """
+        return False
+
+
+    def extend_to_server_job(self, ssh_user="root", ssh_pass="", ssh_port=22,
+                             only_collect_crashinfo=False):
+        """
+        Extend client job to be able work as server part.
+        """
+        self._uncollected_log_file = None
+        created_uncollected_logs = False
+        if self.resultdir:
+            if only_collect_crashinfo:
+                # if this is a crashinfo-only run, and there were no existing
+                # uncollected logs, just bail out early
+                logging.info("No existing uncollected logs, "
+                             "skipping crashinfo collection")
+            else:
+                self._uncollected_log_file = os.path.join(self.resultdir,
+                                                          'uncollected_logs')
+                log_file = open(self._uncollected_log_file, "w")
+                pickle.dump([], log_file)
+                log_file.close()
+                created_uncollected_logs = True
+
+        from autotest_lib.client.common_lib import hosts
+        from autotest_lib.client.common_lib import autotest
+        hosts.factory.ssh_user = ssh_user
+        hosts.factory.ssh_port = ssh_port
+        hosts.factory.ssh_pass = ssh_pass
+        hosts.Host.job = self
+        autotest.Autotest.job = self
+
+        if self.resultdir:
+            os.chdir(self.resultdir)
+            # touch status.log so that the parser knows a job is running here
+            #open(self.get_status_log_path(), 'a').close()
+            self.enable_external_logging()
+
+        fd, sub_job_filepath = tempfile.mkstemp(dir=self.tmpdir)
+        os.close(fd)
+        self._sub_state = base_job.job_state()
+        self._sub_state.set_backing_file(sub_job_filepath)
+
+        self._sub_state.set('autotests', 'count', 0)
+
+
     @classmethod
     def _get_environ_autodir(cls):
         return os.environ['AUTODIR']
@@ -162,6 +224,7 @@  class base_client_job(base_job.base_job):
         As of now self.record() needs self.resultdir, self._group_level,
         self.harness and of course self._logger.
         """
+        #TODO: Fix delete all debugdir files.
         if not options.cont:
             self._cleanup_debugdir_files()
             self._cleanup_results_dir()
@@ -198,8 +261,9 @@  class base_client_job(base_job.base_job):
             self.harness.test_status(rendered_entry, msg_tag)
             # send the entry to stdout, if it's enabled
             logging.info(rendered_entry)
+        self._indenter = status_indenter(self)
         self._logger = base_job.status_logger(
-            self, status_indenter(self), record_hook=client_job_record_hook,
+            self, self._indenter, record_hook=client_job_record_hook,
             tap_writer=self._tap)
 
     def _post_record_init(self, control, options, drop_caches,
@@ -1201,6 +1265,83 @@  class base_client_job(base_job.base_job):
         self._state.set('client', 'sysinfo', state)
 
 
+    def preprocess_client_state(self):
+        """
+        Produce a state file for initializing the state of a client job.
+
+        Creates a new client state file with all the current server state, as
+        well as some pre-set client state.
+
+        @returns The path of the file the state was written into.
+        """
+        # initialize the sysinfo state
+        def group_func():
+            if self._state.has_namespace('client-s'):
+                self._sub_state.set('autotests','count',
+                                self._sub_state.get('autotests','count')+1)
+            else:
+                self._state.rename_namespace("client", "client-s")
+                self._sub_state.set('autotests','count',1)
+
+        self._state.atomic(group_func)
+
+        self._state.set('client', 'sysinfo', self.sysinfo.serialize())
+
+        # dump the state out to a tempfile
+        fd, file_path = tempfile.mkstemp(dir=self.tmpdir)
+        os.close(fd)
+
+        # write_to_file doesn't need locking, we exclusively own file_path
+        self._state.write_to_file(file_path)
+        return file_path
+
+
+    def postprocess_client_state(self, state_path):
+        """
+        Update the state of this job with the state from a client job.
+
+        Updates the state of the server side of a job with the final state
+        of a client job that was run. Updates the non-client-specific state,
+        pulls in some specific bits from the client-specific state, and then
+        discards the rest. Removes the state file afterwards
+
+        @param state_file A path to the state file from the client.
+        """
+        # update the on-disk state
+        try:
+            self._state.read_from_file(state_path)
+            os.remove(state_path)
+        except OSError, e:
+            # ignore file-not-found errors
+            if e.errno != errno.ENOENT:
+                raise
+            else:
+                logging.debug('Client state file %s not found', state_path)
+
+        # update the sysinfo state
+        if self._state.has('client', 'sysinfo'):
+            self.sysinfo.deserialize(self._state.get('client', 'sysinfo'))
+
+        # drop all the client-specific state
+        self._state.discard_namespace('client')
+
+
+    def clean_state(self):
+        """
+        Repair client namespace after sub client job ends.
+        """
+        def group_func():
+            if self._state.has_namespace('client-s'):
+                if self._sub_state.get('autotests','count') > 1:
+                    self._sub_state.set('autotests','count',
+                                    self._sub_state.get('autotests','count')-1)
+                else:
+                    if self._state.has_namespace('client'):
+                        self._state.discard_namespace('client')
+                    self._state.rename_namespace('client-s', 'client')
+        self._sub_state.atomic(group_func)
+
+
 class disk_usage_monitor:
     def __init__(self, logging_func, device, max_mb_per_hour):
         self.func = logging_func
diff --git a/client/bin/profilers.py b/client/bin/profilers.py
index df152d9..d3b1556 100644
--- a/client/bin/profilers.py
+++ b/client/bin/profilers.py
@@ -5,7 +5,12 @@  from autotest_lib.client.common_lib import utils, error, profiler_manager
 
 
 class profilers(profiler_manager.profiler_manager):
+    def __init__(self, job):
+        super(profilers, self).__init__(job)
+        self.add_log = {}
+
     def load_profiler(self, profiler, args, dargs):
+        self.add_log[profiler] = (args, dargs)
         prof_dir = os.path.join(self.job.autodir, "profilers", profiler)
 
         try:
diff --git a/client/common_lib/autotest.py b/client/common_lib/autotest.py
index b103fb3..01ef65b 100644
--- a/client/common_lib/autotest.py
+++ b/client/common_lib/autotest.py
@@ -7,14 +7,9 @@  from autotest_lib.client.common_lib import base_job, log, error, autotemp
 from autotest_lib.client.common_lib import global_config, packages
 from autotest_lib.client.common_lib import utils as client_utils
 
-AUTOTEST_SVN  = 'svn://test.kernel.org/autotest/trunk/client'
+AUTOTEST_SVN = 'svn://test.kernel.org/autotest/trunk/client'
 AUTOTEST_HTTP = 'http://test.kernel.org/svn/autotest/trunk/client'
 
-# Timeouts for powering down and up respectively
-HALT_TIME = 300
-BOOT_TIME = 1800
-CRASH_RECOVERY_TIME = 9000
-
 
 get_value = global_config.global_config.get_config_value
 autoserv_prebuild = get_value('AUTOSERV', 'enable_server_prebuild',
@@ -37,7 +32,7 @@  class BaseAutotest(installable_object.InstallableObject):
     implement the unimplemented methods in parent classes.
     """
 
-    def __init__(self, host = None):
+    def __init__(self, host=None):
         self.host = host
         self.got = False
         self.installed = False
@@ -223,7 +218,7 @@  class BaseAutotest(installable_object.InstallableObject):
             except (error.PackageInstallError, error.AutoservRunError,
                     global_config.ConfigError), e:
                 logging.info("Could not install autotest using the packaging "
-                             "system: %s. Trying other methods",  e)
+                             "system: %s. Trying other methods", e)
 
         # try to install from file or directory
         if self.source_material:
@@ -272,7 +267,7 @@  class BaseAutotest(installable_object.InstallableObject):
         self.installed = False
 
 
-    def get(self, location = None):
+    def get(self, location=None):
         if not location:
             location = os.path.join(self.serverdir, '../client')
             location = os.path.abspath(location)
@@ -290,7 +285,7 @@  class BaseAutotest(installable_object.InstallableObject):
 
     def run(self, control_file, results_dir='.', host=None, timeout=None,
             tag=None, parallel_flag=False, background=False,
-            client_disconnect_timeout=1800):
+            client_disconnect_timeout=None):
         """
         Run an autotest job on the remote machine.
 
@@ -307,7 +302,8 @@  class BaseAutotest(installable_object.InstallableObject):
                 a background job; the code calling run will be responsible
                 for monitoring the client and collecting the results.
         @param client_disconnect_timeout: Seconds to wait for the remote host
-                to come back after a reboot.  [default: 30 minutes]
+                to come back after a reboot. Defaults to the host setting for
+                DEFAULT_REBOOT_TIMEOUT.
 
         @raises AutotestRunError: If there is a problem executing
                 the control file.
@@ -315,6 +311,9 @@  class BaseAutotest(installable_object.InstallableObject):
         host = self._get_host_and_setup(host)
         results_dir = os.path.abspath(results_dir)
 
+        if client_disconnect_timeout is None:
+            client_disconnect_timeout = host.DEFAULT_REBOOT_TIMEOUT
+
         if tag:
             results_dir = os.path.join(results_dir, tag)
 
@@ -399,9 +398,12 @@  class BaseAutotest(installable_object.InstallableObject):
         if os.path.abspath(tmppath) != os.path.abspath(control_file):
             os.remove(tmppath)
 
-        atrun.execute_control(
-                timeout=timeout,
-                client_disconnect_timeout=client_disconnect_timeout)
+        try:
+            atrun.execute_control(
+                    timeout=timeout,
+                    client_disconnect_timeout=client_disconnect_timeout)
+        finally:
+            host.job.clean_state()
 
 
     def run_timed_test(self, test_name, results_dir='.', host=None,
@@ -700,12 +702,13 @@  class _BaseRun(object):
     def _wait_for_reboot(self, old_boot_id):
         logging.info("Client is rebooting")
         logging.info("Waiting for client to halt")
-        if not self.host.wait_down(HALT_TIME, old_boot_id=old_boot_id):
+        if not self.host.wait_down(self.host.WAIT_DOWN_REBOOT_TIMEOUT,
+                                   old_boot_id=old_boot_id):
             err = "%s failed to shutdown after %d"
-            err %= (self.host.hostname, HALT_TIME)
+            err %= (self.host.hostname, self.host.WAIT_DOWN_REBOOT_TIMEOUT)
             raise error.AutotestRunError(err)
         logging.info("Client down, waiting for restart")
-        if not self.host.wait_up(BOOT_TIME):
+        if not self.host.wait_up(self.host.DEFAULT_REBOOT_TIMEOUT):
             # since reboot failed
             # hardreset the machine once if possible
             # before failing this control file
@@ -719,7 +722,8 @@  class _BaseRun(object):
                 warning %= self.host.hostname
                 logging.warning(warning)
             raise error.AutotestRunError("%s failed to boot after %ds" %
-                                         (self.host.hostname, BOOT_TIME))
+                                         (self.host.hostname,
+                                          self.host.DEFAULT_REBOOT_TIMEOUT))
         self.host.reboot_followup()
 
 
@@ -765,7 +769,7 @@  class _BaseRun(object):
                 self.log_unexpected_abort(logger)
 
                 # give the client machine a chance to recover from a crash
-                self.host.wait_up(CRASH_RECOVERY_TIME)
+                self.host.wait_up(self.host.HOURS_TO_WAIT_FOR_RECOVERY * 3600)
                 msg = ("Aborting - unexpected final status message from "
                        "client on %s: %s\n") % (self.host.hostname, last)
                 raise error.AutotestRunError(msg)
diff --git a/client/common_lib/base_hosts/__init__.py b/client/common_lib/base_hosts/__init__.py
index c2b42ca..c7ef409 100644
--- a/client/common_lib/base_hosts/__init__.py
+++ b/client/common_lib/base_hosts/__init__.py
@@ -1,6 +1,14 @@ 
+# Copyright 2009 Google Inc. Released under the GPL v2
+
+"""This is a convenience module to import all available types of hosts.
+
+Implementation details:
+You should 'import hosts' instead of importing every available host module.
+"""
+
 from autotest_lib.client.common_lib import utils
 import base_classes
 
 Host = utils.import_site_class(
     __file__, "autotest_lib.client.common_lib.base_hosts.site_host", "SiteHost",
-    base_classes.Host)
\ No newline at end of file
+    base_classes.Host)
diff --git a/client/common_lib/base_hosts/base_classes.py b/client/common_lib/base_hosts/base_classes.py
index b267e79..68cabe8 100644
--- a/client/common_lib/base_hosts/base_classes.py
+++ b/client/common_lib/base_hosts/base_classes.py
@@ -50,10 +50,14 @@  class Host(object):
     """
 
     job = None
-    DEFAULT_REBOOT_TIMEOUT = 1800
-    WAIT_DOWN_REBOOT_TIMEOUT = 840
-    WAIT_DOWN_REBOOT_WARNING = 540
-    HOURS_TO_WAIT_FOR_RECOVERY = 2.5
+    DEFAULT_REBOOT_TIMEOUT = global_config.global_config.get_config_value(
+        "HOSTS", "default_reboot_timeout", type=int, default=1800)
+    WAIT_DOWN_REBOOT_TIMEOUT = global_config.global_config.get_config_value(
+        "HOSTS", "wait_down_reboot_timeout", type=int, default=840)
+    WAIT_DOWN_REBOOT_WARNING = global_config.global_config.get_config_value(
+        "HOSTS", "wait_down_reboot_warning", type=int, default=540)
+    HOURS_TO_WAIT_FOR_RECOVERY = global_config.global_config.get_config_value(
+        "HOSTS", "hours_to_wait_for_recovery", type=float, default=2.5)
     # the number of hardware repair requests that need to happen before we
     # actually send machines to hardware repair
     HARDWARE_REPAIR_REQUEST_THRESHOLD = 4
@@ -188,18 +192,18 @@  class Host(object):
 
 
     def wait_for_restart(self, timeout=DEFAULT_REBOOT_TIMEOUT,
+                         down_timeout=WAIT_DOWN_REBOOT_TIMEOUT,
+                         down_warning=WAIT_DOWN_REBOOT_WARNING,
                          log_failure=True, old_boot_id=None, **dargs):
         """ Wait for the host to come back from a reboot. This is a generic
         implementation based entirely on wait_up and wait_down. """
-        if not self.wait_down(timeout=self.WAIT_DOWN_REBOOT_TIMEOUT,
-                              warning_timer=self.WAIT_DOWN_REBOOT_WARNING,
+        if not self.wait_down(timeout=down_timeout,
+                              warning_timer=down_warning,
                               old_boot_id=old_boot_id):
             if log_failure:
                 self.record("ABORT", None, "reboot.verify", "shut down failed")
             raise error.AutoservShutdownError("Host did not shut down")
 
-        self.wait_up(timeout)
-        time.sleep(2)    # this is needed for complete reliability
         if self.wait_up(timeout):
             self.record("GOOD", None, "reboot.verify")
             self.reboot_followup(**dargs)
@@ -238,12 +242,12 @@  class Host(object):
 
         @raises AutoservDiskFullHostError if path has less than gb GB free.
         """
-        one_mb = 10**6  # Bytes (SI unit).
+        one_mb = 10 ** 6  # Bytes (SI unit).
         mb_per_gb = 1000.0
         logging.info('Checking for >= %s GB of space under %s on machine %s',
                      gb, path, self.hostname)
         df = self.run('df -PB %d %s | tail -1' % (one_mb, path)).stdout.split()
-        free_space_gb = int(df[3])/mb_per_gb
+        free_space_gb = int(df[3]) / mb_per_gb
         if free_space_gb < gb:
             raise error.AutoservDiskFullHostError(path, gb, free_space_gb)
         else:
diff --git a/client/common_lib/base_job.py b/client/common_lib/base_job.py
index eef9efc..300203e 100644
--- a/client/common_lib/base_job.py
+++ b/client/common_lib/base_job.py
@@ -348,6 +348,24 @@  class job_state(object):
 
 
     @with_backing_file
+    def rename_namespace(self, namespace, new_namespace):
+        """Saves the value given with the provided name.
+
+        This operation must be atomic.
+
+        @param namespace: The namespace that the property should be stored in.
+        @param new_namespace: The name the value should be saved with.
+        """
+        if namespace in self._state:
+            self._state[new_namespace] = self._state[namespace]
+            del self._state[namespace]
+            logging.debug('Namespace %s rename to %s', namespace,
+                          new_namespace)
+        elif not namespace in self._state:
+            raise KeyError('No namespace %s in namespaces' % (namespace))
+
+
+    @with_backing_file
     def has(self, namespace, name):
         """Return a boolean indicating if namespace.name is defined.
 
@@ -361,6 +379,17 @@  class job_state(object):
 
 
     @with_backing_file
+    def has_namespace(self, namespace):
+        """Return a boolean indicating if namespace.name is defined.
+
+        @param namespace: The namespace to check for a definition.
+
+        @return: True if the namespace defined False otherwise.
+        """
+        return namespace in self._state
+
+
+    @with_backing_file
     def discard(self, namespace, name):
         """If namespace.name is a defined value, deletes it.
 
@@ -389,6 +418,13 @@  class job_state(object):
         logging.debug('Persistent state %s.* deleted', namespace)
 
 
+    @with_backing_file
+    def atomic(self, func, *args, **kargs):
+        """Use state like synchronization tool between process.
+        """
+        return func(*args, **kargs)
+
+
     @staticmethod
     def property_factory(state_attribute, property_attribute, default,
                          namespace='global_properties'):
@@ -933,7 +969,7 @@  class base_job(object):
             Returns a status_logger instance for recording job status logs.
     """
 
-   # capture the dependency on several helper classes with factories
+    # capture the dependency on several helper classes with factories
     _job_directory = job_directory
     _job_state = job_state
 
@@ -1208,3 +1244,82 @@  class base_job(object):
                 logs should be written into the subdirectory status log file.
         """
         self._get_status_logger().record_entry(entry, log_in_subdir)
+
+
+    def clean_state(self):
+        pass
+
+
+    def _update_uncollected_logs_list(self, update_func):
+        """Updates the uncollected logs list in a multi-process safe manner.
+
+        @param update_func - a function that updates the list of uncollected
+            logs. Should take one parameter, the list to be updated.
+        """
+        if self._uncollected_log_file:
+            log_file = open(self._uncollected_log_file, "r+")
+            fcntl.flock(log_file, fcntl.LOCK_EX)
+            try:
+                uncollected_logs = pickle.load(log_file)
+                update_func(uncollected_logs)
+                log_file.seek(0)
+                log_file.truncate()
+                pickle.dump(uncollected_logs, log_file)
+                log_file.flush()
+            finally:
+                fcntl.flock(log_file, fcntl.LOCK_UN)
+                log_file.close()
+
+
+    def add_client_log(self, hostname, remote_path, local_path):
+        """Adds a new set of client logs to the list of uncollected logs,
+        to allow for future log recovery.
+
+        @param host - the hostname of the machine holding the logs
+        @param remote_path - the directory on the remote machine holding logs
+        @param local_path - the local directory to copy the logs into
+        """
+        def update_func(logs_list):
+            logs_list.append((hostname, remote_path, local_path))
+        self._update_uncollected_logs_list(update_func)
+
+
+    def remove_client_log(self, hostname, remote_path, local_path):
+        """Removes a set of client logs from the list of uncollected logs,
+        to allow for future log recovery.
+
+        @param host - the hostname of the machine holding the logs
+        @param remote_path - the directory on the remote machine holding logs
+        @param local_path - the local directory to copy the logs into
+        """
+        def update_func(logs_list):
+            logs_list.remove((hostname, remote_path, local_path))
+        self._update_uncollected_logs_list(update_func)
+
+
+    def get_client_logs(self):
+        """Retrieves the list of uncollected logs, if it exists.
+
+        @returns A list of (host, remote_path, local_path) tuples. Returns
+                 an empty list if no uncollected logs file exists.
+        """
+        log_exists = (self._uncollected_log_file and
+                      os.path.exists(self._uncollected_log_file))
+        if log_exists:
+            return pickle.load(open(self._uncollected_log_file))
+        else:
+            return []
+
+
+    def get_record_context(self):
+        """Returns an object representing the current job.record context.
+
+        The object returned is an opaque object with a 0-arg restore method
+        which can be called to restore the job.record context (i.e. indentation)
+        to the current level. The intention is that it should be used when
+        something external which generate job.record calls (e.g. an autotest
+        client) can fail catastrophically and the server job record state
+        needs to be reset to its original "known good" state.
+
+        @return: A context object with a 0-arg restore() method."""
+        return self._indenter.get_context()
diff --git a/client/common_lib/hosts/monitors/console.py b/client/common_lib/hosts/monitors/console.py
index c516f9f..60e561f 100755
--- a/client/common_lib/hosts/monitors/console.py
+++ b/client/common_lib/hosts/monitors/console.py
@@ -5,7 +5,7 @@ 
 
 import gzip, optparse, os, signal, sys, time
 import common
-from autotest_lib.server.hosts.monitors import monitors_util
+from autotest_lib.client.common_lib.hosts.monitors import monitors_util
 
 PATTERNS_PATH = os.path.join(os.path.dirname(__file__), 'console_patterns')
 
diff --git a/server/autotest_unittest.py b/server/autotest_unittest.py
index 78d0dec..1f038b4 100755
--- a/server/autotest_unittest.py
+++ b/server/autotest_unittest.py
@@ -234,6 +234,8 @@  class TestBaseAutotest(unittest.TestCase):
         run_obj.execute_control.expect_call(timeout=30,
                                             client_disconnect_timeout=1800)
 
+        self.host.job.clean_state.expect_call()
+
         # run and check output
         self.base_autotest.run(control, timeout=30)
         self.god.check_playback()
diff --git a/server/server_job.py b/server/server_job.py
index e3ffbc8..7da0cf0 100644
--- a/server/server_job.py
+++ b/server/server_job.py
@@ -748,20 +748,6 @@  class base_server_job(base_job.base_job):
         return subdirectory
 
 
-    def get_record_context(self):
-        """Returns an object representing the current job.record context.
-
-        The object returned is an opaque object with a 0-arg restore method
-        which can be called to restore the job.record context (i.e. indentation)
-        to the current level. The intention is that it should be used when
-        something external which generate job.record calls (e.g. an autotest
-        client) can fail catastrophically and the server job record state
-        needs to be reset to its original "known good" state.
-
-        @return: A context object with a 0-arg restore() method."""
-        return self._indenter.get_context()
-
-
     def record_summary(self, status_code, test_name, reason='', attributes=None,
                        distinguishing_attributes=(), child_test_ids=None):
         """Record a summary test result.
@@ -837,67 +823,6 @@  class base_server_job(base_job.base_job):
             return None
 
 
-    def _update_uncollected_logs_list(self, update_func):
-        """Updates the uncollected logs list in a multi-process safe manner.
-
-        @param update_func - a function that updates the list of uncollected
-            logs. Should take one parameter, the list to be updated.
-        """
-        if self._uncollected_log_file:
-            log_file = open(self._uncollected_log_file, "r+")
-            fcntl.flock(log_file, fcntl.LOCK_EX)
-        try:
-            uncollected_logs = pickle.load(log_file)
-            update_func(uncollected_logs)
-            log_file.seek(0)
-            log_file.truncate()
-            pickle.dump(uncollected_logs, log_file)
-            log_file.flush()
-        finally:
-            fcntl.flock(log_file, fcntl.LOCK_UN)
-            log_file.close()
-
-
-    def add_client_log(self, hostname, remote_path, local_path):
-        """Adds a new set of client logs to the list of uncollected logs,
-        to allow for future log recovery.
-
-        @param host - the hostname of the machine holding the logs
-        @param remote_path - the directory on the remote machine holding logs
-        @param local_path - the local directory to copy the logs into
-        """
-        def update_func(logs_list):
-            logs_list.append((hostname, remote_path, local_path))
-        self._update_uncollected_logs_list(update_func)
-
-
-    def remove_client_log(self, hostname, remote_path, local_path):
-        """Removes a set of client logs from the list of uncollected logs,
-        to allow for future log recovery.
-
-        @param host - the hostname of the machine holding the logs
-        @param remote_path - the directory on the remote machine holding logs
-        @param local_path - the local directory to copy the logs into
-        """
-        def update_func(logs_list):
-            logs_list.remove((hostname, remote_path, local_path))
-        self._update_uncollected_logs_list(update_func)
-
-
-    def get_client_logs(self):
-        """Retrieves the list of uncollected logs, if it exists.
-
-        @returns A list of (host, remote_path, local_path) tuples. Returns
-                 an empty list if no uncollected logs file exists.
-        """
-        log_exists = (self._uncollected_log_file and
-                      os.path.exists(self._uncollected_log_file))
-        if log_exists:
-            return pickle.load(open(self._uncollected_log_file))
-        else:
-            return []
-
-
     def _fill_server_control_namespace(self, namespace, protect=True):
         """
         Prepare a namespace to be used when executing server control files.
diff --git a/server/tests/netperf2/netperf2.py b/server/tests/netperf2/netperf2.py
index a4531b3..ac16453 100644
--- a/server/tests/netperf2/netperf2.py
+++ b/server/tests/netperf2/netperf2.py
@@ -1,5 +1,7 @@ 
 from autotest_lib.client.common_lib import subcommand, hosts
 from autotest_lib.server import utils, autotest, test
+from autotest_lib.client.common_lib import error
+import time as btime
 
 class netperf2(test.test):
     version = 2