diff mbox

[Autotest,3/4] virt: Move migration_multi_host_fd to framework

Message ID 1351703968-27007-4-git-send-email-jzupka@redhat.com (mailing list archive)
State New, archived
Headers show

Commit Message

Jiri Zupka Oct. 31, 2012, 5:19 p.m. UTC
It allows use migration over fd in all multihost_migration tests.

Signed-off-by: Ji?í Župka <jzupka@redhat.com>
---
 kvm/tests/migration_multi_host_fd.py |  196 ----------------------------------
 virttest/utils_test.py               |  178 ++++++++++++++++++++++++++++++-
 2 files changed, 173 insertions(+), 201 deletions(-)
 delete mode 100644 kvm/tests/migration_multi_host_fd.py
diff mbox

Patch

diff --git a/kvm/tests/migration_multi_host_fd.py b/kvm/tests/migration_multi_host_fd.py
deleted file mode 100644
index 1a91be6..0000000
--- a/kvm/tests/migration_multi_host_fd.py
+++ /dev/null
@@ -1,196 +0,0 @@ 
-import logging, socket, time, errno, os, fcntl
-from virttest import utils_test, utils_misc, remote, virt_vm
-from autotest.client.shared import error
-from autotest.client.shared.syncdata import SyncData
-
-
-def run_migration_multi_host_fd(test, params, env):
-    """
-    KVM multi-host migration over fd test:
-
-    Migrate machine over socket's fd. Migration execution progress is
-    described in documentation for migrate method in class MultihostMigration.
-    This test allows migrate only one machine at once.
-
-    @param test: kvm test object.
-    @param params: Dictionary with test parameters.
-    @param env: Dictionary with the test environment.
-    """
-    class TestMultihostMigrationFd(utils_test.MultihostMigration):
-        def __init__(self, test, params, env):
-            super(TestMultihostMigrationFd, self).__init__(test, params, env)
-
-        def migrate_vms_src(self, mig_data):
-            """
-            Migrate vms source.
-
-            @param mig_Data: Data for migration.
-
-            For change way how machine migrates is necessary
-            re implement this method.
-            """
-            logging.info("Start migrating now...")
-            cancel_delay = mig_data.params.get("cancel_delay")
-            if cancel_delay is not None:
-                cancel_delay = int(cancel_delay)
-            vm = mig_data.vms[0]
-            vm.migrate(dest_host=mig_data.dst,
-                       cancel_delay=cancel_delay,
-                       protocol="fd",
-                       fd_src=mig_data.params['migration_fd'])
-
-        def _check_vms_source(self, mig_data):
-            start_mig_tout = mig_data.params.get("start_migration_timeout",
-                                                 None)
-            if start_mig_tout is None:
-                for vm in mig_data.vms:
-                    vm.wait_for_login(timeout=self.login_timeout)
-            self._hosts_barrier(mig_data.hosts, mig_data.mig_id,
-                                'prepare_VMS', 60)
-
-        def _check_vms_dest(self, mig_data):
-            self._hosts_barrier(mig_data.hosts, mig_data.mig_id,
-                                 'prepare_VMS', 120)
-            os.close(mig_data.params['migration_fd'])
-
-        def _connect_to_server(self, host, port, timeout=60):
-            """
-            Connect to network server.
-            """
-            endtime = time.time() + timeout
-            sock = None
-            while endtime > time.time():
-                sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
-                try:
-                    sock.connect((host, port))
-                    break
-                except socket.error, err:
-                    (code, _) = err
-                    if (code != errno.ECONNREFUSED):
-                        raise
-                    time.sleep(1)
-
-            return sock
-
-        def _create_server(self, port, timeout=60):
-            """
-            Create network server.
-            """
-            sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
-            sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
-            sock.settimeout(timeout)
-            sock.bind(('', port))
-            sock.listen(1)
-            return sock
-
-        def migration_scenario(self, worker=None):
-            srchost = self.params.get("hosts")[0]
-            dsthost = self.params.get("hosts")[1]
-            mig_port = None
-
-            if params.get("hostid") == self.master_id():
-                mig_port = utils_misc.find_free_port(5200, 6000)
-
-            sync = SyncData(self.master_id(), self.hostid,
-                             self.params.get("hosts"),
-                             {'src': srchost, 'dst': dsthost,
-                              'port': "ports"}, self.sync_server)
-            mig_port = sync.sync(mig_port, timeout=120)
-            mig_port = mig_port[srchost]
-            logging.debug("Migration port %d" % (mig_port))
-
-            if params.get("hostid") != self.master_id():
-                s = self._connect_to_server(srchost, mig_port)
-                try:
-                    fd = s.fileno()
-                    logging.debug("File descrtiptor %d used for"
-                                  " migration." % (fd))
-
-                    self.migrate_wait(["vm1"], srchost, dsthost, mig_mode="fd",
-                                      params_append={"migration_fd": fd})
-                finally:
-                    s.close()
-            else:
-                s = self._create_server(mig_port)
-                try:
-                    conn, _ = s.accept()
-                    fd = conn.fileno()
-                    logging.debug("File descrtiptor %d used for"
-                                  " migration." % (fd))
-
-                    #Prohibits descriptor inheritance.
-                    flags = fcntl.fcntl(fd, fcntl.F_GETFD)
-                    flags |= fcntl.FD_CLOEXEC
-                    fcntl.fcntl(fd, fcntl.F_SETFD, flags)
-
-                    self.migrate_wait(["vm1"], srchost, dsthost, mig_mode="fd",
-                                      params_append={"migration_fd": fd})
-                    conn.close()
-                finally:
-                    s.close()
-
-
-    class TestMultihostMigrationCancel(TestMultihostMigrationFd):
-        def __init__(self, test, params, env):
-            super(TestMultihostMigrationCancel, self).__init__(test, params,
-                                                               env)
-            self.install_path = params.get("cpuflags_install_path", "/tmp")
-            self.vm_mem = int(params.get("mem", "512"))
-            self.srchost = self.params.get("hosts")[0]
-            self.dsthost = self.params.get("hosts")[1]
-            self.vms = params.get("vms").split()
-            self.id = {'src': self.srchost,
-                       'dst': self.dsthost,
-                       "type": "cancel_migration"}
-
-        def check_guest(self):
-            broken_vms = []
-            for vm in self.vms:
-                try:
-                    vm = env.get_vm(vm)
-                    session = vm.wait_for_login(timeout=self.login_timeout)
-                    session.sendline("killall -9 cpuflags-test")
-                except (remote.LoginError, virt_vm.VMError):
-                    broken_vms.append(vm)
-            if broken_vms:
-                raise error.TestError("VMs %s should work on src"
-                                      " host after canceling of"
-                                      " migration." % (broken_vms))
-            # Try migration again without cancel.
-
-        def migration_scenario(self):
-            def worker(mig_data):
-                vm = mig_data.vms[0]
-                session = vm.wait_for_login(timeout=self.login_timeout)
-
-                utils_misc.install_cpuflags_util_on_vm(test, vm,
-                                                       self.install_path,
-                                                   extra_flags="-msse3 -msse2")
-
-                cmd = ("%s/cpuflags-test --stressmem %d %%" %
-                           (os.path.join(self.install_path, "test_cpu_flags"),
-                            self.vm_mem / 2))
-                logging.debug("Sending command: %s" % (cmd))
-                session.sendline(cmd)
-
-            super_cls = super(TestMultihostMigrationCancel, self)
-            super_cls.migration_scenario(worker)
-
-            if params.get("hostid") == self.master_id():
-                self.check_guest()
-
-            self._hosts_barrier(self.hosts, self.id,
-                                'wait_for_cancel', self.login_timeout)
-
-            params["cancel_delay"] = None
-            super(TestMultihostMigrationCancel, self).migration_scenario()
-
-
-    mig = None
-    cancel_delay = params.get("cancel_delay", None)
-    if cancel_delay is None:
-        mig = TestMultihostMigrationFd(test, params, env)
-    else:
-        mig = TestMultihostMigrationCancel(test, params, env)
-
-    mig.run()
diff --git a/virttest/utils_test.py b/virttest/utils_test.py
index eda2099..43a8ae3 100644
--- a/virttest/utils_test.py
+++ b/virttest/utils_test.py
@@ -21,7 +21,7 @@  More specifically:
 @copyright: 2008-2009 Red Hat Inc.
 """
 
-import time, os, logging, re, signal, imp, tempfile, commands
+import time, os, logging, re, signal, imp, tempfile, commands, errno, fcntl, socket
 import threading, shelve
 from Queue import Queue
 from autotest.client.shared import error, global_config
@@ -501,6 +501,11 @@  class MultihostMigration(object):
         raise NotImplementedError
 
 
+    def post_migration(self, vm, cancel_delay, dsthost, vm_ports,
+                             not_wait_for_migration, fd):
+        pass
+
+
     def migrate_vms_src(self, mig_data):
         """
         Migrate vms source.
@@ -510,18 +515,27 @@  class MultihostMigration(object):
         For change way how machine migrates is necessary
         re implement this method.
         """
-        def mig_wrapper(vm, cancel_delay, dsthost, vm_ports):
+        def mig_wrapper(vm, cancel_delay, dsthost, vm_ports,
+                        not_wait_for_migration):
             vm.migrate(cancel_delay=cancel_delay, dest_host=dsthost,
-                       remote_port=vm_ports[vm.name])
+                       remote_port=vm_ports[vm.name],
+                       not_wait_for_migration=not_wait_for_migration)
+
+            self.post_migration(vm, cancel_delay, dsthost, vm_ports,
+                                not_wait_for_migration, None)
 
         logging.info("Start migrating now...")
         cancel_delay = mig_data.params.get("cancel_delay")
         if cancel_delay is not None:
             cancel_delay = int(cancel_delay)
+        not_wait_for_migration = mig_data.params.get("not_wait_for_migration")
+        if not_wait_for_migration == "yes":
+            not_wait_for_migration = True
         multi_mig = []
         for vm in mig_data.vms:
             multi_mig.append((mig_wrapper, (vm, cancel_delay,
-                                            mig_data.dst, mig_data.vm_ports)))
+                                            mig_data.dst, mig_data.vm_ports,
+                                            not_wait_for_migration)))
         utils_misc.parallel(multi_mig)
 
 
@@ -674,7 +688,7 @@  class MultihostMigration(object):
         """
         Kill vms and delete cloned images.
         """
-        storage.postprocess_images(self.test.bindir, self.params)
+        pass
 
 
     def migrate(self, vms_name, srchost, dsthost, start_work=None,
@@ -831,6 +845,160 @@  class MultihostMigration(object):
             self.cleanup()
 
 
+class MultihostMigrationFd(MultihostMigration):
+    def __init__(self, test, params, env):
+        super(MultihostMigrationFd, self).__init__(test, params, env)
+
+    def migrate_vms_src(self, mig_data):
+        """
+        Migrate vms source.
+
+        @param mig_Data: Data for migration.
+
+        For change way how machine migrates is necessary
+        re implement this method.
+        """
+        def mig_wrapper(vm, cancel_delay, dsthost, vm_ports,
+                        not_wait_for_migration, fd):
+            vm.migrate(cancel_delay=cancel_delay, dest_host=dsthost,
+                       not_wait_for_migration=not_wait_for_migration,
+                       protocol="fd",
+                       fd_src=fd)
+
+            self.post_migration(vm, cancel_delay, dsthost, vm_ports,
+                                not_wait_for_migration, fd)
+
+        logging.info("Start migrating now...")
+        cancel_delay = mig_data.params.get("cancel_delay")
+        if cancel_delay is not None:
+            cancel_delay = int(cancel_delay)
+        not_wait_for_migration = mig_data.params.get("not_wait_for_migration")
+        if not_wait_for_migration == "yes":
+            not_wait_for_migration = True
+
+        multi_mig = []
+        for vm in mig_data.vms:
+            fd = vm.params.get("migration_fd")
+            multi_mig.append((mig_wrapper, (vm, cancel_delay,
+                                            mig_data.dst, mig_data.vm_ports,
+                                            not_wait_for_migration,
+                                            fd)))
+        utils_misc.parallel(multi_mig)
+
+    def _check_vms_source(self, mig_data):
+        start_mig_tout = mig_data.params.get("start_migration_timeout", None)
+        if start_mig_tout is None:
+            for vm in mig_data.vms:
+                vm.wait_for_login(timeout=self.login_timeout)
+        self._hosts_barrier(mig_data.hosts, mig_data.mig_id,
+                            'prepare_VMS', 60)
+
+    def _check_vms_dest(self, mig_data):
+        self._hosts_barrier(mig_data.hosts, mig_data.mig_id,
+                             'prepare_VMS', 120)
+        for vm in mig_data.vms:
+            fd = vm.params.get("migration_fd")
+            os.close(fd)
+
+    def _connect_to_server(self, host, port, timeout=60):
+        """
+        Connect to network server.
+        """
+        endtime = time.time() + timeout
+        sock = None
+        while endtime > time.time():
+            sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+            try:
+                sock.connect((host, port))
+                break
+            except socket.error, err:
+                (code, _) = err
+                if (code != errno.ECONNREFUSED):
+                    raise
+                time.sleep(1)
+
+        return sock
+
+    def _create_server(self, port, timeout=60):
+        """
+        Create network server.
+        """
+        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+        sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
+        sock.settimeout(timeout)
+        sock.bind(('', port))
+        sock.listen(1)
+        return sock
+
+    def migrate_wait(self, vms_name, srchost, dsthost, start_work=None,
+                      check_work=None, mig_mode="fd", params_append=None):
+        vms_count = len(vms_name)
+        mig_ports = []
+
+        if self.params.get("hostid") == srchost:
+            last_port = 5199
+            for _ in range(vms_count):
+                last_port = utils_misc.find_free_port(last_port+1, 6000)
+                mig_ports.append(last_port)
+
+        sync = SyncData(self.master_id(), self.hostid,
+                         self.params.get("hosts"),
+                         {'src': srchost, 'dst': dsthost,
+                          'port': "ports"}, self.sync_server)
+
+        mig_ports = sync.sync(mig_ports, timeout=120)
+        mig_ports = mig_ports[srchost]
+        logging.debug("Migration port %s" % (mig_ports))
+
+        if self.params.get("hostid") != srchost:
+            sockets = []
+            for mig_port in mig_ports:
+                sockets.append(self._connect_to_server(srchost, mig_port))
+            try:
+                fds = {}
+                for s, vm_name in zip(sockets, vms_name):
+                    fds["migration_fd_%s" % vm_name] = s.fileno()
+                logging.debug("File descrtiptors %s used for"
+                              " migration." % (fds))
+
+                super_cls = super(MultihostMigrationFd, self)
+                super_cls.migrate_wait(vms_name, srchost, dsthost,
+                                  start_work=start_work, mig_mode="fd",
+                                  params_append=fds)
+            finally:
+                for s in sockets:
+                    s.close()
+        else:
+            sockets = []
+            for mig_port in mig_ports:
+                sockets.append(self._create_server(mig_port))
+            try:
+                conns = []
+                for s in sockets:
+                    conns.append(s.accept()[0])
+                fds = {}
+                for conn, vm_name in zip(conns, vms_name):
+                    fds["migration_fd_%s" % vm_name] = conn.fileno()
+                logging.debug("File descrtiptors %s used for"
+                              " migration." % (fds))
+
+                #Prohibits descriptor inheritance.
+                for fd in fds.values():
+                    flags = fcntl.fcntl(fd, fcntl.F_GETFD)
+                    flags |= fcntl.FD_CLOEXEC
+                    fcntl.fcntl(fd, fcntl.F_SETFD, flags)
+
+                super_cls = super(MultihostMigrationFd, self)
+                super_cls.migrate_wait(vms_name, srchost, dsthost,
+                                  start_work=start_work, mig_mode="fd",
+                                  params_append=fds)
+                for conn in conns:
+                    conn.close()
+            finally:
+                for s in sockets:
+                    s.close()
+
+
 def stop_windows_service(session, service, timeout=120):
     """
     Stop a Windows service using sc.