@@ -6,6 +6,11 @@ Interfaces to the QEMU monitor.
import socket, time, threading, logging
import kvm_utils
+try:
+ import json
+except ImportError:
+ logging.warning("Could not import json module. "
+ "QMP monitor functionality disabled.")
class MonitorError(Exception):
@@ -28,6 +33,14 @@ class MonitorProtocolError(MonitorError):
pass
+class MonitorNotSupportedError(MonitorError):
+ pass
+
+
+class QMPCmdError(MonitorError):
+ pass
+
+
class Monitor:
"""
Common code for monitor classes.
@@ -352,3 +365,295 @@ class HumanMonitor(Monitor):
@return: The command's output
"""
return self._get_command_output("mouse_button %d" % state)
+
+
+class QMPMonitor(Monitor):
+ """
+ Wraps QMP monitor commands.
+ """
+
+ def __init__(self, name, filename, suppress_exceptions=False):
+ """
+ Connect to the monitor socket, read the greeting message and issue the
+ qmp_capabilities command. Also make sure the json module is available.
+
+ @param name: Monitor identifier (a string)
+ @param filename: Monitor socket filename
+ @raise MonitorConnectError: Raised if the connection fails and
+ suppress_exceptions is False
+ @raise MonitorProtocolError: Raised if the no QMP greeting message is
+ received and suppress_exceptions is False
+ @raise MonitorNotSupportedError: Raised if json isn't available and
+ suppress_exceptions is False
+ @note: Other exceptions may be raised if the qmp_capabilities command
+ fails. See _get_command_output's docstring.
+ """
+ try:
+ Monitor.__init__(self, name, filename)
+
+ self.protocol = "qmp"
+ self._greeting = None
+ self._events = []
+
+ # Make sure json is available
+ try:
+ json
+ except NameError:
+ raise MonitorNotSupportedError("QMP requires the json module "
+ "(Python 2.6 and up)")
+
+ # Read greeting message
+ end_time = time.time() + 20
+ while time.time() < end_time:
+ for obj in self._read_objects():
+ if "QMP" in obj:
+ self._greeting = obj["QMP"]
+ break
+ if self._greeting:
+ break
+ time.sleep(0.1)
+ else:
+ raise MonitorProtocolError("No QMP greeting message received")
+
+ # Issue qmp_capabilities
+ self._get_command_output("qmp_capabilities")
+
+ except MonitorError, e:
+ if suppress_exceptions:
+ logging.warn(e)
+ else:
+ raise
+
+
+ # Private methods
+
+ def _build_cmd(self, cmd, args=None):
+ obj = {"execute": cmd}
+ if args:
+ obj["arguments"] = args
+ return obj
+
+
+ def _read_objects(self, timeout=5):
+ """
+ Read lines from monitor and try to decode them.
+ Stop when all available lines have been successfully decoded, or when
+ timeout expires. If any decoded objects are asynchronous events, store
+ them in self._events. Return all decoded objects.
+
+ @param timeout: Time to wait for all lines to decode successfully
+ @return: A list of objects
+ """
+ s = ""
+ objs = []
+ end_time = time.time() + timeout
+ while time.time() < end_time:
+ s += self._recvall()
+ for line in s.splitlines():
+ if not line:
+ continue
+ try:
+ obj = json.loads(line)
+ except:
+ # Found an incomplete or broken line -- keep reading
+ break
+ objs += [obj]
+ else:
+ # All lines are OK -- stop reading
+ break
+ time.sleep(0.1)
+ # Keep track of asynchronous events
+ self._events += [obj for obj in objs if "event" in obj]
+ return objs
+
+
+ def _send_command(self, cmd, args=None):
+ """
+ Send command without waiting for response.
+
+ @param cmd: Command to send
+ @param args: A dict containing command arguments, or None
+ @raise MonitorLockError: Raised if the lock cannot be acquired
+ @raise MonitorSendError: Raised if the command cannot be sent
+ """
+ if not self._acquire_lock(20):
+ raise MonitorLockError("Could not acquire exclusive lock to send "
+ "QMP command '%s'" % cmd)
+
+ try:
+ cmdobj = self._build_cmd(cmd, args)
+ try:
+ self._socket.sendall(json.dumps(cmdobj) + "\n")
+ except socket.error:
+ raise MonitorSendError("Could not send QMP command '%s'" % cmd)
+
+ finally:
+ self._lock.release()
+
+
+ def _get_command_output(self, cmd, args=None, timeout=20):
+ """
+ Send monitor command and wait for response.
+
+ @param cmd: Command to send
+ @param args: A dict containing command arguments, or None
+ @param timeout: Time duration to wait for response
+ @return: The response received
+ @raise MonitorLockError: Raised if the lock cannot be acquired
+ @raise MonitorSendError: Raised if the command cannot be sent
+ @raise MonitorProtocolError: Raised if no response is received
+ @raise QMPCmdError: Raised if the response is an error message
+ (the exception's args are (msg, data) where msg is a string and
+ data is the error data)
+ """
+ if not self._acquire_lock(20):
+ raise MonitorLockError("Could not acquire exclusive lock to send "
+ "QMP command '%s'" % cmd)
+
+ try:
+ # Read any data that might be available
+ self._read_objects()
+ # Send command
+ self._send_command(cmd, args)
+ # Read response
+ end_time = time.time() + timeout
+ while time.time() < end_time:
+ for obj in self._read_objects():
+ if "return" in obj:
+ return obj["return"]
+ elif "error" in obj:
+ raise QMPCmdError("QMP command '%s' failed" % cmd,
+ obj["error"])
+ time.sleep(0.1)
+ # No response found
+ raise MonitorProtocolError("Received no response to QMP command "
+ "'%s'" % cmd)
+
+ finally:
+ self._lock.release()
+
+
+ # Public methods
+
+ def is_responsive(self):
+ """
+ Make sure the monitor is responsive by sending a command.
+
+ @return: True if responsive, False otherwise
+ """
+ try:
+ self._get_command_output("query-version")
+ return True
+ except MonitorError:
+ return False
+
+
+ def get_events(self):
+ """
+ Return a list of the asynchronous events received since the last
+ clear_events() call.
+
+ @return: A list of events (the objects returned have an "event" key)
+ @raise MonitorLockError: Raised if the lock cannot be acquired
+ """
+ if not self._acquire_lock(20):
+ raise MonitorLockError("Could not acquire exclusive lock to read "
+ "QMP events")
+ try:
+ self._read_objects()
+ return self._events[:]
+ finally:
+ self._lock.release()
+
+
+ def clear_events(self):
+ """
+ Clear the list of asynchronous events.
+
+ @raise MonitorLockError: Raised if the lock cannot be acquired
+ """
+ if not self._acquire_lock(20):
+ raise MonitorLockError("Could not acquire exclusive lock to clear "
+ "QMP event list")
+ self._events = []
+ self._lock.release()
+
+
+ # Command wrappers
+ # Note: all of the following functions raise exceptions in a similar manner
+ # to cmd() and _get_command_output().
+
+ def cmd(self, command, timeout=20):
+ """
+ Send a simple command with no parameters and return its output.
+ Should only be used for commands that take no parameters and are
+ implemented under the same name for both the human and QMP monitors.
+
+ @param command: Command to send
+ @param timeout: Time duration to wait for response
+ @return: The response to the command
+ @raise MonitorLockError: Raised if the lock cannot be acquired
+ @raise MonitorSendError: Raised if the command cannot be sent
+ @raise MonitorProtocolError: Raised if no response is received
+ """
+ return self._get_command_output(command, timeout=timeout)
+
+
+ def quit(self):
+ """
+ Send "quit" and return the response.
+ """
+ return self._get_command_output("quit")
+
+
+ def info(self, what):
+ """
+ Request info about something and return the response.
+ """
+ return self._get_command_output("query-%s" % what)
+
+
+ def query(self, what):
+ """
+ Alias for info.
+ """
+ return self.info(what)
+
+
+ def screendump(self, filename):
+ """
+ Request a screendump.
+
+ @param filename: Location for the screendump
+ @return: The response to the command
+ """
+ args = {"filename": filename}
+ return self._get_command_output("screendump", args)
+
+
+ def migrate(self, uri, full_copy=False, incremental_copy=False, wait=False):
+ """
+ Migrate.
+
+ @param uri: destination URI
+ @param full_copy: If true, migrate with full disk copy
+ @param incremental_copy: If true, migrate with incremental disk copy
+ @param wait: If true, wait for completion
+ @return: The response to the command
+ """
+ args = {"uri": uri,
+ "blk": full_copy,
+ "inc": incremental_copy}
+ return self._get_command_output("migrate", args)
+
+
+ def migrate_set_speed(self, value):
+ """
+ Set maximum speed (in bytes/sec) for migrations.
+
+ @param value: Speed in bytes/sec
+ @return: The response to the command
+ """
+ args = {"value": value}
+ return self._get_command_output("migrate_set_speed", args)
+
@@ -590,8 +590,10 @@ class VM:
while time.time() < end_time:
try:
if monitor_params.get("monitor_type") == "qmp":
- # Add a QMP monitor: not implemented yet
- monitor = None
+ # Add a QMP monitor
+ monitor = kvm_monitor.QMPMonitor(
+ monitor_name,
+ self.get_monitor_filename(monitor_name))
else:
# Add a "human" monitor
monitor = kvm_monitor.HumanMonitor(
@@ -600,7 +602,7 @@ class VM:
except kvm_monitor.MonitorError, e:
logging.warn(e)
else:
- if monitor and monitor.is_responsive():
+ if monitor.is_responsive():
break
time.sleep(1)
else: