@@ -52,6 +52,29 @@ enum vfio_user_command {
#define VFIO_USER_ERROR 0x20
+/*
+ * VFIO_USER_VERSION
+ */
+typedef struct {
+ VFIOUserHdr hdr;
+ uint16_t major;
+ uint16_t minor;
+ char capabilities[];
+} VFIOUserVersion;
+
+#define VFIO_USER_MAJOR_VER 0
+#define VFIO_USER_MINOR_VER 0
+
+#define VFIO_USER_CAP "capabilities"
+
+/* "capabilities" members */
+#define VFIO_USER_CAP_MAX_FDS "max_msg_fds"
+#define VFIO_USER_CAP_MAX_XFER "max_data_xfer_size"
+#define VFIO_USER_CAP_MIGR "migration"
+
+/* "migration" member */
+#define VFIO_USER_CAP_PGSIZE "pgsize"
+
#define VFIO_USER_DEF_MAX_FDS 8
#define VFIO_USER_MAX_MAX_FDS 16
@@ -81,5 +81,6 @@ void vfio_user_disconnect(VFIOProxy *proxy);
void vfio_user_set_handler(VFIODevice *vbasedev,
void (*handler)(void *opaque, VFIOUserMsg *msg),
void *reqarg);
+int vfio_user_validate_version(VFIODevice *vbasedev, Error **errp);
#endif /* VFIO_USER_H */
@@ -3512,6 +3512,12 @@ static void vfio_user_pci_realize(PCIDevice *pdev, Error **errp)
proxy->flags |= VFIO_PROXY_FORCE_QUEUED;
}
+ vfio_user_validate_version(vbasedev, &err);
+ if (err != NULL) {
+ error_propagate(errp, err);
+ goto error;
+ }
+
vbasedev->name = g_strdup_printf("VFIO user <%s>", udev->sock_name);
vbasedev->dev = DEVICE(vdev);
vbasedev->fd = -1;
@@ -3520,6 +3526,11 @@ static void vfio_user_pci_realize(PCIDevice *pdev, Error **errp)
vbasedev->ops = &vfio_user_pci_ops;
vbasedev->valid_ops = &vfio_pci_valid_ops;
+ return;
+
+error:
+ vfio_user_disconnect(proxy);
+ error_prepend(errp, VFIO_MSG_PREFIX, vdev->vbasedev.name);
}
static void vfio_user_instance_finalize(Object *obj)
@@ -23,12 +23,20 @@
#include "io/channel-socket.h"
#include "io/channel-util.h"
#include "sysemu/iothread.h"
+#include "qapi/qmp/qdict.h"
+#include "qapi/qmp/qjson.h"
+#include "qapi/qmp/qnull.h"
+#include "qapi/qmp/qstring.h"
+#include "qapi/qmp/qnum.h"
#include "user.h"
static uint64_t max_xfer_size = VFIO_USER_DEF_MAX_XFER;
+static uint64_t max_send_fds = VFIO_USER_DEF_MAX_FDS;
+static int wait_time = 1000; /* wait 1 sec for replies */
static IOThread *vfio_user_iothread;
static void vfio_user_shutdown(VFIOProxy *proxy);
+static int vfio_user_send_qio(VFIOProxy *proxy, VFIOUserMsg *msg);
static VFIOUserMsg *vfio_user_getmsg(VFIOProxy *proxy, VFIOUserHdr *hdr,
VFIOUserFDs *fds);
static VFIOUserFDs *vfio_user_getfds(int numfds);
@@ -36,9 +44,16 @@ static void vfio_user_recycle(VFIOProxy *proxy, VFIOUserMsg *msg);
static void vfio_user_recv(void *opaque);
static int vfio_user_recv_one(VFIOProxy *proxy);
+static void vfio_user_send(void *opaque);
+static int vfio_user_send_one(VFIOProxy *proxy, VFIOUserMsg *msg);
static void vfio_user_cb(void *opaque);
static void vfio_user_request(void *opaque);
+static int vfio_user_send_queued(VFIOProxy *proxy, VFIOUserMsg *msg);
+static void vfio_user_send_wait(VFIOProxy *proxy, VFIOUserHdr *hdr,
+ VFIOUserFDs *fds, int rsize, bool nobql);
+static void vfio_user_request_msg(VFIOUserHdr *hdr, uint16_t cmd,
+ uint32_t size, uint32_t flags);
static inline void vfio_user_set_error(VFIOUserHdr *hdr, uint32_t err)
@@ -57,6 +72,32 @@ static void vfio_user_shutdown(VFIOProxy *proxy)
qio_channel_set_aio_fd_handler(proxy->ioc, proxy->ctx, NULL, NULL, NULL);
}
+static int vfio_user_send_qio(VFIOProxy *proxy, VFIOUserMsg *msg)
+{
+ VFIOUserFDs *fds = msg->fds;
+ struct iovec iov = {
+ .iov_base = msg->hdr,
+ .iov_len = msg->hdr->size,
+ };
+ size_t numfds = 0;
+ int ret, *fdp = NULL;
+ Error *local_err = NULL;
+
+ if (fds != NULL && fds->send_fds != 0) {
+ numfds = fds->send_fds;
+ fdp = fds->fds;
+ }
+
+ ret = qio_channel_writev_full(proxy->ioc, &iov, 1, fdp, numfds, &local_err);
+
+ if (ret == -1) {
+ vfio_user_set_error(msg->hdr, EIO);
+ vfio_user_shutdown(proxy);
+ error_report_err(local_err);
+ }
+ return ret;
+}
+
static VFIOUserMsg *vfio_user_getmsg(VFIOProxy *proxy, VFIOUserHdr *hdr,
VFIOUserFDs *fds)
{
@@ -310,6 +351,53 @@ err:
return -1;
}
+/*
+ * Send messages from outgoing queue when the socket buffer has space.
+ * If we deplete 'outgoing', remove ourselves from the poll list.
+ */
+static void vfio_user_send(void *opaque)
+{
+ VFIOProxy *proxy = opaque;
+ VFIOUserMsg *msg;
+
+ QEMU_LOCK_GUARD(&proxy->lock);
+
+ if (proxy->state == VFIO_PROXY_CONNECTED) {
+ while (!QTAILQ_EMPTY(&proxy->outgoing)) {
+ msg = QTAILQ_FIRST(&proxy->outgoing);
+ if (vfio_user_send_one(proxy, msg) < 0) {
+ return;
+ }
+ }
+ qio_channel_set_aio_fd_handler(proxy->ioc, proxy->ctx,
+ vfio_user_recv, NULL, proxy);
+ }
+}
+
+/*
+ * Send a single message.
+ *
+ * Sent async messages are freed, others are moved to pending queue.
+ */
+static int vfio_user_send_one(VFIOProxy *proxy, VFIOUserMsg *msg)
+{
+ int ret;
+
+ ret = vfio_user_send_qio(proxy, msg);
+ if (ret < 0) {
+ return ret;
+ }
+
+ QTAILQ_REMOVE(&proxy->outgoing, msg, next);
+ if (msg->type == VFIO_MSG_ASYNC) {
+ vfio_user_recycle(proxy, msg);
+ } else {
+ QTAILQ_INSERT_TAIL(&proxy->pending, msg, next);
+ }
+
+ return 0;
+}
+
static void vfio_user_cb(void *opaque)
{
VFIOProxy *proxy = opaque;
@@ -370,6 +458,129 @@ static void vfio_user_request(void *opaque)
}
}
+/*
+ * Messages are queued onto the proxy's outgoing list.
+ *
+ * It handles 3 types of messages:
+ *
+ * async messages - replies and posted writes
+ *
+ * There will be no reply from the server, so message
+ * buffers are freed after they're sent.
+ *
+ * nowait messages - map/unmap during address space transactions
+ *
+ * These are also sent async, but a reply is expected so that
+ * vfio_wait_reqs() can wait for the youngest nowait request.
+ * They transition from the outgoing list to the pending list
+ * when sent, and are freed when the reply is received.
+ *
+ * wait messages - all other requests
+ *
+ * The reply to these messages is waited for by their caller.
+ * They also transition from outgoing to pending when sent, but
+ * the message buffer is returned to the caller with the reply
+ * contents. The caller is responsible for freeing these messages.
+ *
+ * As an optimization, if the outgoing list and the socket send
+ * buffer are empty, the message is sent inline instead of being
+ * added to the outgoing list. The rest of the transitions are
+ * unchanged.
+ *
+ * returns 0 if the message was sent or queued
+ * returns -1 on send error
+ */
+static int vfio_user_send_queued(VFIOProxy *proxy, VFIOUserMsg *msg)
+{
+ int ret;
+
+ /*
+ * Unsent outgoing msgs - add to tail
+ */
+ if (!QTAILQ_EMPTY(&proxy->outgoing)) {
+ QTAILQ_INSERT_TAIL(&proxy->outgoing, msg, next);
+ return 0;
+ }
+
+ /*
+ * Try inline - if blocked, queue it and kick send poller
+ */
+ if (proxy->flags & VFIO_PROXY_FORCE_QUEUED) {
+ ret = QIO_CHANNEL_ERR_BLOCK;
+ } else {
+ ret = vfio_user_send_qio(proxy, msg);
+ }
+ if (ret == QIO_CHANNEL_ERR_BLOCK) {
+ QTAILQ_INSERT_HEAD(&proxy->outgoing, msg, next);
+ qio_channel_set_aio_fd_handler(proxy->ioc, proxy->ctx,
+ vfio_user_recv, vfio_user_send,
+ proxy);
+ return 0;
+ }
+ if (ret == -1) {
+ return ret;
+ }
+
+ /*
+ * Sent - free async, add others to pending
+ */
+ if (msg->type == VFIO_MSG_ASYNC) {
+ vfio_user_recycle(proxy, msg);
+ } else {
+ QTAILQ_INSERT_TAIL(&proxy->pending, msg, next);
+ }
+
+ return 0;
+}
+
+static void vfio_user_send_wait(VFIOProxy *proxy, VFIOUserHdr *hdr,
+ VFIOUserFDs *fds, int rsize, bool nobql)
+{
+ VFIOUserMsg *msg;
+ bool iolock = false;
+ int ret;
+
+ if (hdr->flags & VFIO_USER_NO_REPLY) {
+ error_printf("vfio_user_send_wait on async message\n");
+ return;
+ }
+
+ /*
+ * We may block later, so use a per-proxy lock and drop
+ * BQL while we sleep unless 'nobql' says not to.
+ */
+ qemu_mutex_lock(&proxy->lock);
+ if (!nobql) {
+ iolock = qemu_mutex_iothread_locked();
+ if (iolock) {
+ qemu_mutex_unlock_iothread();
+ }
+ }
+
+ msg = vfio_user_getmsg(proxy, hdr, fds);
+ msg->id = hdr->id;
+ msg->rsize = rsize ? rsize : hdr->size;
+ msg->type = VFIO_MSG_WAIT;
+
+ ret = vfio_user_send_queued(proxy, msg);
+
+ if (ret == 0) {
+ while (!msg->complete) {
+ if (!qemu_cond_timedwait(&msg->cv, &proxy->lock, wait_time)) {
+ vfio_user_set_error(hdr, ETIMEDOUT);
+ break;
+ }
+ }
+ }
+ vfio_user_recycle(proxy, msg);
+
+ /* lock order is BQL->proxy - don't hold proxy when getting BQL */
+ qemu_mutex_unlock(&proxy->lock);
+ if (iolock) {
+ qemu_mutex_lock_iothread();
+ }
+}
+
static QLIST_HEAD(, VFIOProxy) vfio_user_sockets =
QLIST_HEAD_INITIALIZER(vfio_user_sockets);
@@ -494,3 +705,203 @@ void vfio_user_disconnect(VFIOProxy *proxy)
g_free(proxy->sockname);
g_free(proxy);
}
+
+static void vfio_user_request_msg(VFIOUserHdr *hdr, uint16_t cmd,
+ uint32_t size, uint32_t flags)
+{
+ static uint16_t next_id;
+
+ hdr->id = qatomic_fetch_inc(&next_id);
+ hdr->command = cmd;
+ hdr->size = size;
+ hdr->flags = (flags & ~VFIO_USER_TYPE) | VFIO_USER_REQUEST;
+ hdr->error_reply = 0;
+}
+
+struct cap_entry {
+ const char *name;
+ int (*check)(QObject *qobj, Error **errp);
+};
+
+static int caps_parse(QDict *qdict, struct cap_entry caps[], Error **errp)
+{
+ QObject *qobj;
+ struct cap_entry *p;
+
+ for (p = caps; p->name != NULL; p++) {
+ qobj = qdict_get(qdict, p->name);
+ if (qobj != NULL) {
+ if (p->check(qobj, errp)) {
+ return -1;
+ }
+ qdict_del(qdict, p->name);
+ }
+ }
+
+ /* warning, for now */
+ if (qdict_size(qdict) != 0) {
+ error_printf("spurious capabilities\n");
+ }
+ return 0;
+}
+
+static int check_pgsize(QObject *qobj, Error **errp)
+{
+ QNum *qn = qobject_to(QNum, qobj);
+ uint64_t pgsize;
+
+ if (qn == NULL || !qnum_get_try_uint(qn, &pgsize)) {
+ error_setg(errp, "malformed %s", VFIO_USER_CAP_PGSIZE);
+ return -1;
+ }
+ return pgsize == 4096 ? 0 : -1;
+}
+
+static struct cap_entry caps_migr[] = {
+ { VFIO_USER_CAP_PGSIZE, check_pgsize },
+ { NULL }
+};
+
+static int check_max_fds(QObject *qobj, Error **errp)
+{
+ QNum *qn = qobject_to(QNum, qobj);
+
+ if (qn == NULL || !qnum_get_try_uint(qn, &max_send_fds) ||
+ max_send_fds > VFIO_USER_MAX_MAX_FDS) {
+ error_setg(errp, "malformed %s", VFIO_USER_CAP_MAX_FDS);
+ return -1;
+ }
+ return 0;
+}
+
+static int check_max_xfer(QObject *qobj, Error **errp)
+{
+ QNum *qn = qobject_to(QNum, qobj);
+
+ if (qn == NULL || !qnum_get_try_uint(qn, &max_xfer_size) ||
+ max_xfer_size > VFIO_USER_MAX_MAX_XFER) {
+ error_setg(errp, "malformed %s", VFIO_USER_CAP_MAX_XFER);
+ return -1;
+ }
+ return 0;
+}
+
+static int check_migr(QObject *qobj, Error **errp)
+{
+ QDict *qdict = qobject_to(QDict, qobj);
+
+ if (qdict == NULL || caps_parse(qdict, caps_migr, errp)) {
+ error_setg(errp, "malformed %s", VFIO_USER_CAP_MAX_FDS);
+ return -1;
+ }
+ return 0;
+}
+
+static struct cap_entry caps_cap[] = {
+ { VFIO_USER_CAP_MAX_FDS, check_max_fds },
+ { VFIO_USER_CAP_MAX_XFER, check_max_xfer },
+ { VFIO_USER_CAP_MIGR, check_migr },
+ { NULL }
+};
+
+static int check_cap(QObject *qobj, Error **errp)
+{
+ QDict *qdict = qobject_to(QDict, qobj);
+
+ if (qdict == NULL || caps_parse(qdict, caps_cap, errp)) {
+ error_setg(errp, "malformed %s", VFIO_USER_CAP);
+ return -1;
+ }
+ return 0;
+}
+
+static struct cap_entry ver_0_0[] = {
+ { VFIO_USER_CAP, check_cap },
+ { NULL }
+};
+
+static int caps_check(int minor, const char *caps, Error **errp)
+{
+ QObject *qobj;
+ QDict *qdict;
+ int ret;
+
+ qobj = qobject_from_json(caps, NULL);
+ if (qobj == NULL) {
+ error_setg(errp, "malformed capabilities %s", caps);
+ return -1;
+ }
+ qdict = qobject_to(QDict, qobj);
+ if (qdict == NULL) {
+ error_setg(errp, "capabilities %s not an object", caps);
+ qobject_unref(qobj);
+ return -1;
+ }
+ ret = caps_parse(qdict, ver_0_0, errp);
+
+ qobject_unref(qobj);
+ return ret;
+}
+
+static GString *caps_json(void)
+{
+ QDict *dict = qdict_new();
+ QDict *capdict = qdict_new();
+ QDict *migdict = qdict_new();
+ GString *str;
+
+ qdict_put_int(migdict, VFIO_USER_CAP_PGSIZE, 4096);
+ qdict_put_obj(capdict, VFIO_USER_CAP_MIGR, QOBJECT(migdict));
+
+ qdict_put_int(capdict, VFIO_USER_CAP_MAX_FDS, VFIO_USER_MAX_MAX_FDS);
+ qdict_put_int(capdict, VFIO_USER_CAP_MAX_XFER, VFIO_USER_DEF_MAX_XFER);
+
+ qdict_put_obj(dict, VFIO_USER_CAP, QOBJECT(capdict));
+
+ str = qobject_to_json(QOBJECT(dict));
+ qobject_unref(dict);
+ return str;
+}
+
+int vfio_user_validate_version(VFIODevice *vbasedev, Error **errp)
+{
+ g_autofree VFIOUserVersion *msgp;
+ GString *caps;
+ char *reply;
+ int size, caplen;
+
+ caps = caps_json();
+ caplen = caps->len + 1;
+ size = sizeof(*msgp) + caplen;
+ msgp = g_malloc0(size);
+
+ vfio_user_request_msg(&msgp->hdr, VFIO_USER_VERSION, size, 0);
+ msgp->major = VFIO_USER_MAJOR_VER;
+ msgp->minor = VFIO_USER_MINOR_VER;
+ memcpy(&msgp->capabilities, caps->str, caplen);
+ g_string_free(caps, true);
+
+ vfio_user_send_wait(vbasedev->proxy, &msgp->hdr, NULL, 0, false);
+ if (msgp->hdr.flags & VFIO_USER_ERROR) {
+ error_setg_errno(errp, msgp->hdr.error_reply, "version reply");
+ return -1;
+ }
+
+ if (msgp->major != VFIO_USER_MAJOR_VER ||
+ msgp->minor > VFIO_USER_MINOR_VER) {
+ error_setg(errp, "incompatible server version");
+ return -1;
+ }
+
+ reply = msgp->capabilities;
+ if (reply[msgp->hdr.size - sizeof(*msgp) - 1] != '\0') {
+ error_setg(errp, "corrupt version reply");
+ return -1;
+ }
+
+ if (caps_check(msgp->minor, reply, errp) != 0) {
+ return -1;
+ }
+
+ return 0;
+}