@@ -344,9 +344,8 @@ def postprocess(test, params, env):
else:
vm.destroy(gracefully=False)
- # Kill the tailing threads of all VMs
- for vm in kvm_utils.env_get_all_vms(env):
- vm.kill_tail_thread()
+ # Kill all kvm_subprocess tail threads
+ kvm_subprocess.kill_tail_threads()
# Terminate tcpdump if no VMs are alive
living_vms = [vm for vm in kvm_utils.env_get_all_vms(env) if vm.is_alive()]
@@ -548,6 +548,21 @@ class kvm_spawn:
self.send(str + self.linesep)
+_thread_kill_requested = False
+
+def kill_tail_threads():
+ """
+ Kill all kvm_tail threads.
+
+ After calling this function no new threads should be started.
+ """
+ global _thread_kill_requested
+ _thread_kill_requested = True
+ for t in threading.enumerate():
+ if hasattr(t, "name") and t.name.startswith("tail_thread"):
+ t.join(10)
+
+
class kvm_tail(kvm_spawn):
"""
This class runs a child process in the background and sends its output in
@@ -608,7 +623,6 @@ class kvm_tail(kvm_spawn):
# Start the thread in the background
self.tail_thread = None
- self.__thread_kill_requested = False
if termination_func or output_func:
self._start_thread()
@@ -675,15 +689,6 @@ class kvm_tail(kvm_spawn):
self.output_prefix = output_prefix
- def kill_tail_thread(self):
- """
- Stop the tailing thread which calls output_func() and
- termination_func().
- """
- self.__thread_kill_requested = True
- self._join_thread()
-
-
def _tail(self):
def print_line(text):
# Pre-pend prefix and remove trailing whitespace
@@ -695,60 +700,68 @@ class kvm_tail(kvm_spawn):
except TypeError:
pass
- fd = self._get_fd("tail")
- buffer = ""
- while True:
- if self.__thread_kill_requested:
+ try:
+ fd = self._get_fd("tail")
+ buffer = ""
+ while True:
+ global _thread_kill_requested
+ if _thread_kill_requested:
+ return
+ try:
+ # See if there's any data to read from the pipe
+ r, w, x = select.select([fd], [], [], 0.05)
+ except:
+ break
+ if fd in r:
+ # Some data is available; read it
+ new_data = os.read(fd, 1024)
+ if not new_data:
+ break
+ buffer += new_data
+ # Send the output to output_func line by line
+ # (except for the last line)
+ if self.output_func:
+ lines = buffer.split("\n")
+ for line in lines[:-1]:
+ print_line(line)
+ # Leave only the last line
+ last_newline_index = buffer.rfind("\n")
+ buffer = buffer[last_newline_index+1:]
+ else:
+ # No output is available right now; flush the buffer
+ if buffer:
+ print_line(buffer)
+ buffer = ""
+ # The process terminated; print any remaining output
+ if buffer:
+ print_line(buffer)
+ # Get the exit status, print it and send it to termination_func
+ status = self.get_status()
+ if status is None:
return
+ print_line("(Process terminated with status %s)" % status)
try:
- # See if there's any data to read from the pipe
- r, w, x = select.select([fd], [], [], 0.05)
- except:
- break
- if fd in r:
- # Some data is available; read it
- new_data = os.read(fd, 1024)
- if not new_data:
- break
- buffer += new_data
- # Send the output to output_func line by line
- # (except for the last line)
- if self.output_func:
- lines = buffer.split("\n")
- for line in lines[:-1]:
- print_line(line)
- # Leave only the last line
- last_newline_index = buffer.rfind("\n")
- buffer = buffer[last_newline_index+1:]
- else:
- # No output is available right now; flush the buffer
- if buffer:
- print_line(buffer)
- buffer = ""
- # The process terminated; print any remaining output
- if buffer:
- print_line(buffer)
- # Get the exit status, print it and send it to termination_func
- status = self.get_status()
- if status is None:
- return
- print_line("(Process terminated with status %s)" % status)
- try:
- params = self.termination_params + (status,)
- self.termination_func(*params)
- except TypeError:
- pass
+ params = self.termination_params + (status,)
+ self.termination_func(*params)
+ except TypeError:
+ pass
+ finally:
+ self.tail_thread = None
def _start_thread(self):
- self.tail_thread = threading.Thread(None, self._tail)
+ self.tail_thread = threading.Thread(target=self._tail,
+ name="tail_thread_%s" % self.id)
self.tail_thread.start()
def _join_thread(self):
# Wait for the tail thread to exit
- if self.tail_thread:
- self.tail_thread.join()
+ # (it's done this way because self.tail_thread may become None at any
+ # time)
+ t = self.tail_thread
+ if t:
+ t.join()
class kvm_expect(kvm_tail):
@@ -776,14 +776,6 @@ class VM:
return not self.process or not self.process.is_alive()
- def kill_tail_thread(self):
- """
- Stop the tailing thread which reports the output of qemu.
- """
- if self.process:
- self.process.kill_tail_thread()
-
-
def get_params(self):
"""
Return the VM's params dict. Most modified params take effect only