diff mbox

[KVM-AUTOTEST,3/3] KVM test: kvm_subprocess: allow garbage collection of kvm_tail instances

Message ID 1278251117-31220-3-git-send-email-mgoldish@redhat.com (mailing list archive)
State New, archived
Headers show

Commit Message

Michael Goldish July 4, 2010, 1:45 p.m. UTC
None
diff mbox

Patch

diff --git a/client/tests/kvm/kvm_preprocessing.py b/client/tests/kvm/kvm_preprocessing.py
index 2f6994a..123928e 100644
--- a/client/tests/kvm/kvm_preprocessing.py
+++ b/client/tests/kvm/kvm_preprocessing.py
@@ -344,9 +344,8 @@  def postprocess(test, params, env):
                 else:
                     vm.destroy(gracefully=False)
 
-    # Kill the tailing threads of all VMs
-    for vm in kvm_utils.env_get_all_vms(env):
-        vm.kill_tail_thread()
+    # Kill all kvm_subprocess tail threads
+    kvm_subprocess.kill_tail_threads()
 
     # Terminate tcpdump if no VMs are alive
     living_vms = [vm for vm in kvm_utils.env_get_all_vms(env) if vm.is_alive()]
diff --git a/client/tests/kvm/kvm_subprocess.py b/client/tests/kvm/kvm_subprocess.py
index 93a8429..f815069 100755
--- a/client/tests/kvm/kvm_subprocess.py
+++ b/client/tests/kvm/kvm_subprocess.py
@@ -548,6 +548,21 @@  class kvm_spawn:
         self.send(str + self.linesep)
 
 
+_thread_kill_requested = False
+
+def kill_tail_threads():
+    """
+    Kill all kvm_tail threads.
+
+    After calling this function no new threads should be started.
+    """
+    global _thread_kill_requested
+    _thread_kill_requested = True
+    for t in threading.enumerate():
+        if hasattr(t, "name") and t.name.startswith("tail_thread"):
+            t.join(10)
+
+
 class kvm_tail(kvm_spawn):
     """
     This class runs a child process in the background and sends its output in
@@ -608,7 +623,6 @@  class kvm_tail(kvm_spawn):
 
         # Start the thread in the background
         self.tail_thread = None
-        self.__thread_kill_requested = False
         if termination_func or output_func:
             self._start_thread()
 
@@ -675,15 +689,6 @@  class kvm_tail(kvm_spawn):
         self.output_prefix = output_prefix
 
 
-    def kill_tail_thread(self):
-        """
-        Stop the tailing thread which calls output_func() and
-        termination_func().
-        """
-        self.__thread_kill_requested = True
-        self._join_thread()
-
-
     def _tail(self):
         def print_line(text):
             # Pre-pend prefix and remove trailing whitespace
@@ -695,60 +700,68 @@  class kvm_tail(kvm_spawn):
             except TypeError:
                 pass
 
-        fd = self._get_fd("tail")
-        buffer = ""
-        while True:
-            if self.__thread_kill_requested:
+        try:
+            fd = self._get_fd("tail")
+            buffer = ""
+            while True:
+                global _thread_kill_requested
+                if _thread_kill_requested:
+                    return
+                try:
+                    # See if there's any data to read from the pipe
+                    r, w, x = select.select([fd], [], [], 0.05)
+                except:
+                    break
+                if fd in r:
+                    # Some data is available; read it
+                    new_data = os.read(fd, 1024)
+                    if not new_data:
+                        break
+                    buffer += new_data
+                    # Send the output to output_func line by line
+                    # (except for the last line)
+                    if self.output_func:
+                        lines = buffer.split("\n")
+                        for line in lines[:-1]:
+                            print_line(line)
+                    # Leave only the last line
+                    last_newline_index = buffer.rfind("\n")
+                    buffer = buffer[last_newline_index+1:]
+                else:
+                    # No output is available right now; flush the buffer
+                    if buffer:
+                        print_line(buffer)
+                        buffer = ""
+            # The process terminated; print any remaining output
+            if buffer:
+                print_line(buffer)
+            # Get the exit status, print it and send it to termination_func
+            status = self.get_status()
+            if status is None:
                 return
+            print_line("(Process terminated with status %s)" % status)
             try:
-                # See if there's any data to read from the pipe
-                r, w, x = select.select([fd], [], [], 0.05)
-            except:
-                break
-            if fd in r:
-                # Some data is available; read it
-                new_data = os.read(fd, 1024)
-                if not new_data:
-                    break
-                buffer += new_data
-                # Send the output to output_func line by line
-                # (except for the last line)
-                if self.output_func:
-                    lines = buffer.split("\n")
-                    for line in lines[:-1]:
-                        print_line(line)
-                # Leave only the last line
-                last_newline_index = buffer.rfind("\n")
-                buffer = buffer[last_newline_index+1:]
-            else:
-                # No output is available right now; flush the buffer
-                if buffer:
-                    print_line(buffer)
-                    buffer = ""
-        # The process terminated; print any remaining output
-        if buffer:
-            print_line(buffer)
-        # Get the exit status, print it and send it to termination_func
-        status = self.get_status()
-        if status is None:
-            return
-        print_line("(Process terminated with status %s)" % status)
-        try:
-            params = self.termination_params + (status,)
-            self.termination_func(*params)
-        except TypeError:
-            pass
+                params = self.termination_params + (status,)
+                self.termination_func(*params)
+            except TypeError:
+                pass
+        finally:
+            self.tail_thread = None
 
 
     def _start_thread(self):
-        self.tail_thread = threading.Thread(None, self._tail)
+        self.tail_thread = threading.Thread(target=self._tail,
+                                            name="tail_thread_%s" % self.id)
         self.tail_thread.start()
 
 
     def _join_thread(self):
         # Wait for the tail thread to exit
-        if self.tail_thread:
-            self.tail_thread.join()
+        # (it's done this way because self.tail_thread may become None at any
+        # time)
+        t = self.tail_thread
+        if t:
+            t.join()
 
 
 class kvm_expect(kvm_tail):
diff --git a/client/tests/kvm/kvm_vm.py b/client/tests/kvm/kvm_vm.py
index 78441c2..879e3dc 100755
--- a/client/tests/kvm/kvm_vm.py
+++ b/client/tests/kvm/kvm_vm.py
@@ -776,14 +776,6 @@  class VM:
         return not self.process or not self.process.is_alive()
 
 
-    def kill_tail_thread(self):
-        """
-        Stop the tailing thread which reports the output of qemu.
-        """
-        if self.process:
-            self.process.kill_tail_thread()
-
-
     def get_params(self):
         """
         Return the VM's params dict. Most modified params take effect only