Message ID | 3d3befd308bacb7ec9f3ccd8f99e5184261279b2.1629131628.git.elena.ufimtseva@oracle.com (mailing list archive) |
---|---|
State | New, archived |
Headers | show |
Series | vfio-user implementation | expand |
On Mon, Aug 16, 2021 at 09:42:38AM -0700, Elena Ufimtseva wrote: > @@ -62,5 +65,10 @@ typedef struct VFIOProxy { > > VFIOProxy *vfio_user_connect_dev(SocketAddress *addr, Error **errp); > void vfio_user_disconnect(VFIOProxy *proxy); > +void vfio_user_set_reqhandler(VFIODevice *vbasdev, "vbasedev" for consistency? > + int (*handler)(void *opaque, char *buf, > + VFIOUserFDs *fds), > + void *reqarg); The handler callback is undocumented. What context does it run in, what do the arguments mean, and what should the function return? Please document it so it's easy for others to modify this code in the future without reverse-engineering the assumptions behind it. > +void vfio_user_recv(void *opaque) > +{ > + VFIODevice *vbasedev = opaque; > + VFIOProxy *proxy = vbasedev->proxy; > + VFIOUserReply *reply = NULL; > + g_autofree int *fdp = NULL; > + VFIOUserFDs reqfds = { 0, 0, fdp }; > + VFIOUserHdr msg; > + struct iovec iov = { > + .iov_base = &msg, > + .iov_len = sizeof(msg), > + }; > + bool isreply; > + int i, ret; > + size_t msgleft, numfds = 0; > + char *data = NULL; > + g_autofree char *buf = NULL; > + Error *local_err = NULL; > + > + qemu_mutex_lock(&proxy->lock); > + if (proxy->state == VFIO_PROXY_CLOSING) { > + qemu_mutex_unlock(&proxy->lock); > + return; > + } > + > + ret = qio_channel_readv_full(proxy->ioc, &iov, 1, &fdp, &numfds, > + &local_err); This is a blocking call. My understanding is that the IOThread is shared by all vfio-user devices, so other devices will have to wait if one of them is acting up (e.g. the device emulation process sent less than sizeof(msg) bytes). While we're blocked in this function the proxy device cannot be hot-removed since proxy->lock is held. It would more robust to use of the event loop to avoid blocking. There could be a per-connection receiver coroutine that calls qio_channel_readv_full_all_eof() (it yields the coroutine if reading would block). > + /* > + * Replies signal a waiter, requests get processed by vfio code > + * that may assume the iothread lock is held. > + */ > + if (isreply) { > + reply->complete = 1; > + if (!reply->nowait) { > + qemu_cond_signal(&reply->cv); > + } else { > + if (msg.flags & VFIO_USER_ERROR) { > + error_printf("vfio_user_rcv error reply on async request "); > + error_printf("command %x error %s\n", msg.command, > + strerror(msg.error_reply)); > + } > + /* just free it if no one is waiting */ > + reply->nowait = 0; > + if (proxy->last_nowait == reply) { > + proxy->last_nowait = NULL; > + } > + g_free(reply->msg); > + QTAILQ_INSERT_HEAD(&proxy->free, reply, next); > + } > + qemu_mutex_unlock(&proxy->lock); > + } else { > + qemu_mutex_unlock(&proxy->lock); > + qemu_mutex_lock_iothread(); The fact that proxy->request() runs with the BQL suggests that VFIO communication should take place in the main event loop thread instead of a separate IOThread. > + /* > + * make sure proxy wasn't closed while we waited > + * checking state without holding the proxy lock is safe > + * since it's only set to CLOSING when BQL is held > + */ > + if (proxy->state != VFIO_PROXY_CLOSING) { > + ret = proxy->request(proxy->reqarg, buf, &reqfds); The request() callback in an earlier patch is a noop for the client implementation. Who frees passed fds? > + if (ret < 0 && !(msg.flags & VFIO_USER_NO_REPLY)) { > + vfio_user_send_reply(proxy, buf, ret); > + } > + } > + qemu_mutex_unlock_iothread(); > + } > + return; > + > +fatal: > + vfio_user_shutdown(proxy); > + proxy->state = VFIO_PROXY_RECV_ERROR; > + > +err: > + for (i = 0; i < numfds; i++) { > + close(fdp[i]); > + } > + if (reply != NULL) { > + /* force an error to keep sending thread from hanging */ > + reply->msg->flags |= VFIO_USER_ERROR; > + reply->msg->error_reply = EINVAL; > + reply->complete = 1; > + qemu_cond_signal(&reply->cv); What about fd passing? The actual fds have been closed already in fdp[] but reply has a copy too. What about the nowait case? If no one is waiting on reply->cv so this reply will be leaked? Stefan
> On Aug 24, 2021, at 8:14 AM, Stefan Hajnoczi <stefanha@redhat.com> wrote: > > On Mon, Aug 16, 2021 at 09:42:38AM -0700, Elena Ufimtseva wrote: >> @@ -62,5 +65,10 @@ typedef struct VFIOProxy { >> >> VFIOProxy *vfio_user_connect_dev(SocketAddress *addr, Error **errp); >> void vfio_user_disconnect(VFIOProxy *proxy); >> +void vfio_user_set_reqhandler(VFIODevice *vbasdev, > > "vbasedev" for consistency? > OK >> + int (*handler)(void *opaque, char *buf, >> + VFIOUserFDs *fds), >> + void *reqarg); > > The handler callback is undocumented. What context does it run in, what > do the arguments mean, and what should the function return? Please > document it so it's easy for others to modify this code in the future > without reverse-engineering the assumptions behind it. > OK >> +void vfio_user_recv(void *opaque) >> +{ >> + VFIODevice *vbasedev = opaque; >> + VFIOProxy *proxy = vbasedev->proxy; >> + VFIOUserReply *reply = NULL; >> + g_autofree int *fdp = NULL; >> + VFIOUserFDs reqfds = { 0, 0, fdp }; >> + VFIOUserHdr msg; >> + struct iovec iov = { >> + .iov_base = &msg, >> + .iov_len = sizeof(msg), >> + }; >> + bool isreply; >> + int i, ret; >> + size_t msgleft, numfds = 0; >> + char *data = NULL; >> + g_autofree char *buf = NULL; >> + Error *local_err = NULL; >> + >> + qemu_mutex_lock(&proxy->lock); >> + if (proxy->state == VFIO_PROXY_CLOSING) { >> + qemu_mutex_unlock(&proxy->lock); >> + return; >> + } >> + >> + ret = qio_channel_readv_full(proxy->ioc, &iov, 1, &fdp, &numfds, >> + &local_err); > > This is a blocking call. My understanding is that the IOThread is shared > by all vfio-user devices, so other devices will have to wait if one of > them is acting up (e.g. the device emulation process sent less than > sizeof(msg) bytes). > > While we're blocked in this function the proxy device cannot be > hot-removed since proxy->lock is held. > > It would more robust to use of the event loop to avoid blocking. There > could be a per-connection receiver coroutine that calls > qio_channel_readv_full_all_eof() (it yields the coroutine if reading > would block). > I thought the main loop uses BQL, which I don’t need for most message processing. The blocking behavior can be fixed with FIONREAD beforehand to detect a message with fewer bytes than in a header. >> + /* >> + * Replies signal a waiter, requests get processed by vfio code >> + * that may assume the iothread lock is held. >> + */ >> + if (isreply) { >> + reply->complete = 1; >> + if (!reply->nowait) { >> + qemu_cond_signal(&reply->cv); >> + } else { >> + if (msg.flags & VFIO_USER_ERROR) { >> + error_printf("vfio_user_rcv error reply on async request "); >> + error_printf("command %x error %s\n", msg.command, >> + strerror(msg.error_reply)); >> + } >> + /* just free it if no one is waiting */ >> + reply->nowait = 0; >> + if (proxy->last_nowait == reply) { >> + proxy->last_nowait = NULL; >> + } >> + g_free(reply->msg); >> + QTAILQ_INSERT_HEAD(&proxy->free, reply, next); >> + } >> + qemu_mutex_unlock(&proxy->lock); >> + } else { >> + qemu_mutex_unlock(&proxy->lock); >> + qemu_mutex_lock_iothread(); > > The fact that proxy->request() runs with the BQL suggests that VFIO > communication should take place in the main event loop thread instead of > a separate IOThread. > See the last reply. Using the main event loop optimizes the least common case. >> + /* >> + * make sure proxy wasn't closed while we waited >> + * checking state without holding the proxy lock is safe >> + * since it's only set to CLOSING when BQL is held >> + */ >> + if (proxy->state != VFIO_PROXY_CLOSING) { >> + ret = proxy->request(proxy->reqarg, buf, &reqfds); > > The request() callback in an earlier patch is a noop for the client > implementation. Who frees passed fds? > Right now no server->client requests send fd’s, but I do need a single point where they are consumed if an error is returned. >> + if (ret < 0 && !(msg.flags & VFIO_USER_NO_REPLY)) { >> + vfio_user_send_reply(proxy, buf, ret); >> + } >> + } >> + qemu_mutex_unlock_iothread(); >> + } >> + return; >> + >> +fatal: >> + vfio_user_shutdown(proxy); >> + proxy->state = VFIO_PROXY_RECV_ERROR; >> + >> +err: >> + for (i = 0; i < numfds; i++) { >> + close(fdp[i]); >> + } >> + if (reply != NULL) { >> + /* force an error to keep sending thread from hanging */ >> + reply->msg->flags |= VFIO_USER_ERROR; >> + reply->msg->error_reply = EINVAL; >> + reply->complete = 1; >> + qemu_cond_signal(&reply->cv); > > What about fd passing? The actual fds have been closed already in fdp[] > but reply has a copy too. > If the sender gets an error, it won’t be using the fd’s. I can zero reply->fds to make this clearer. > What about the nowait case? If no one is waiting on reply->cv so this > reply will be leaked? > This looks like a leak. JJ
On Mon, Aug 30, 2021 at 03:04:08AM +0000, John Johnson wrote: > > > > On Aug 24, 2021, at 8:14 AM, Stefan Hajnoczi <stefanha@redhat.com> wrote: > > > > On Mon, Aug 16, 2021 at 09:42:38AM -0700, Elena Ufimtseva wrote: > >> @@ -62,5 +65,10 @@ typedef struct VFIOProxy { > >> > >> VFIOProxy *vfio_user_connect_dev(SocketAddress *addr, Error **errp); > >> void vfio_user_disconnect(VFIOProxy *proxy); > >> +void vfio_user_set_reqhandler(VFIODevice *vbasdev, > > > > "vbasedev" for consistency? > > > > OK > > >> + int (*handler)(void *opaque, char *buf, > >> + VFIOUserFDs *fds), > >> + void *reqarg); > > > > The handler callback is undocumented. What context does it run in, what > > do the arguments mean, and what should the function return? Please > > document it so it's easy for others to modify this code in the future > > without reverse-engineering the assumptions behind it. > > > > OK > > > >> +void vfio_user_recv(void *opaque) > >> +{ > >> + VFIODevice *vbasedev = opaque; > >> + VFIOProxy *proxy = vbasedev->proxy; > >> + VFIOUserReply *reply = NULL; > >> + g_autofree int *fdp = NULL; > >> + VFIOUserFDs reqfds = { 0, 0, fdp }; > >> + VFIOUserHdr msg; > >> + struct iovec iov = { > >> + .iov_base = &msg, > >> + .iov_len = sizeof(msg), > >> + }; > >> + bool isreply; > >> + int i, ret; > >> + size_t msgleft, numfds = 0; > >> + char *data = NULL; > >> + g_autofree char *buf = NULL; > >> + Error *local_err = NULL; > >> + > >> + qemu_mutex_lock(&proxy->lock); > >> + if (proxy->state == VFIO_PROXY_CLOSING) { > >> + qemu_mutex_unlock(&proxy->lock); > >> + return; > >> + } > >> + > >> + ret = qio_channel_readv_full(proxy->ioc, &iov, 1, &fdp, &numfds, > >> + &local_err); > > > > This is a blocking call. My understanding is that the IOThread is shared > > by all vfio-user devices, so other devices will have to wait if one of > > them is acting up (e.g. the device emulation process sent less than > > sizeof(msg) bytes). > > > > While we're blocked in this function the proxy device cannot be > > hot-removed since proxy->lock is held. > > > > It would more robust to use of the event loop to avoid blocking. There > > could be a per-connection receiver coroutine that calls > > qio_channel_readv_full_all_eof() (it yields the coroutine if reading > > would block). > > > > I thought the main loop uses BQL, which I don’t need for most > message processing. The blocking behavior can be fixed with FIONREAD > beforehand to detect a message with fewer bytes than in a header. It's I/O-bound work, exactly what the main loop was intended for. I'm not sure the BQL can be avoided anyway: - The vfio-user client runs under the BQL (a vCPU thread). - The vfio-user server needs to hold the BQL since most QEMU device models assume they are running under the BQL. The network communication code doesn't need to know about the BQL though. Event-driven code (code that runs in an AioContext) can rely on the fact that its callbacks only execute in the AioContext, i.e. in one thread at any given time. The code probably doesn't need explicit BQL lock/unlock and can run safely in another IOThread if the user wishes (I would leave that up to the user, e.g. -device vfio-user-pci,iothread=iothread0, instead of creating a dedicated IOThread that is shared for all vfio-user communication). See nbd/server.c for an example of doing event-driven network I/O with coroutines. Stefan
diff --git a/hw/vfio/user-protocol.h b/hw/vfio/user-protocol.h new file mode 100644 index 0000000000..27062cb910 --- /dev/null +++ b/hw/vfio/user-protocol.h @@ -0,0 +1,62 @@ +#ifndef VFIO_USER_PROTOCOL_H +#define VFIO_USER_PROTOCOL_H + +/* + * vfio protocol over a UNIX socket. + * + * Copyright © 2018, 2021 Oracle and/or its affiliates. + * + * This work is licensed under the terms of the GNU GPL, version 2. See + * the COPYING file in the top-level directory. + * + * Each message has a standard header that describes the command + * being sent, which is almost always a VFIO ioctl(). + * + * The header may be followed by command-specific data, such as the + * region and offset info for read and write commands. + */ + +typedef struct { + uint16_t id; + uint16_t command; + uint32_t size; + uint32_t flags; + uint32_t error_reply; +} VFIOUserHdr; + +/* VFIOUserHdr commands */ +enum vfio_user_command { + VFIO_USER_VERSION = 1, + VFIO_USER_DMA_MAP = 2, + VFIO_USER_DMA_UNMAP = 3, + VFIO_USER_DEVICE_GET_INFO = 4, + VFIO_USER_DEVICE_GET_REGION_INFO = 5, + VFIO_USER_DEVICE_GET_REGION_IO_FDS = 6, + VFIO_USER_DEVICE_GET_IRQ_INFO = 7, + VFIO_USER_DEVICE_SET_IRQS = 8, + VFIO_USER_REGION_READ = 9, + VFIO_USER_REGION_WRITE = 10, + VFIO_USER_DMA_READ = 11, + VFIO_USER_DMA_WRITE = 12, + VFIO_USER_DEVICE_RESET = 13, + VFIO_USER_DIRTY_PAGES = 14, + VFIO_USER_MAX, +}; + +/* VFIOUserHdr flags */ +#define VFIO_USER_REQUEST 0x0 +#define VFIO_USER_REPLY 0x1 +#define VFIO_USER_TYPE 0xF + +#define VFIO_USER_NO_REPLY 0x10 +#define VFIO_USER_ERROR 0x20 + + +#define VFIO_USER_DEF_MAX_FDS 8 +#define VFIO_USER_MAX_MAX_FDS 16 + +#define VFIO_USER_DEF_MAX_XFER (1024 * 1024) +#define VFIO_USER_MAX_MAX_XFER (64 * 1024 * 1024) + + +#endif /* VFIO_USER_PROTOCOL_H */ diff --git a/hw/vfio/user.h b/hw/vfio/user.h index 62b2d03d56..905e374e12 100644 --- a/hw/vfio/user.h +++ b/hw/vfio/user.h @@ -11,6 +11,8 @@ * */ +#include "user-protocol.h" + typedef struct { int send_fds; int recv_fds; @@ -19,6 +21,7 @@ typedef struct { typedef struct VFIOUserReply { QTAILQ_ENTRY(VFIOUserReply) next; + VFIOUserHdr *msg; VFIOUserFDs *fds; uint32_t rsize; uint32_t id; @@ -62,5 +65,10 @@ typedef struct VFIOProxy { VFIOProxy *vfio_user_connect_dev(SocketAddress *addr, Error **errp); void vfio_user_disconnect(VFIOProxy *proxy); +void vfio_user_set_reqhandler(VFIODevice *vbasdev, + int (*handler)(void *opaque, char *buf, + VFIOUserFDs *fds), + void *reqarg); +void vfio_user_send_reply(VFIOProxy *proxy, char *buf, int ret); #endif /* VFIO_USER_H */ diff --git a/hw/vfio/pci.c b/hw/vfio/pci.c index 7c2d245ca5..7005d9f891 100644 --- a/hw/vfio/pci.c +++ b/hw/vfio/pci.c @@ -3333,6 +3333,11 @@ type_init(register_vfio_pci_dev_type) * vfio-user routines. */ +static int vfio_user_pci_process_req(void *opaque, char *buf, VFIOUserFDs *fds) +{ + return 0; +} + /* * Emulated devices don't use host hot reset */ @@ -3386,6 +3391,7 @@ static void vfio_user_pci_realize(PCIDevice *pdev, Error **errp) return; } vbasedev->proxy = proxy; + vfio_user_set_reqhandler(vbasedev, vfio_user_pci_process_req, vdev); if (udev->secure_dma) { proxy->flags |= VFIO_PROXY_SECURE; diff --git a/hw/vfio/user.c b/hw/vfio/user.c index 3bd304e036..2fcc77d997 100644 --- a/hw/vfio/user.c +++ b/hw/vfio/user.c @@ -25,8 +25,15 @@ #include "sysemu/iothread.h" #include "user.h" +static uint64_t max_xfer_size = VFIO_USER_DEF_MAX_XFER; static IOThread *vfio_user_iothread; + static void vfio_user_shutdown(VFIOProxy *proxy); +static void vfio_user_recv(void *opaque); +static void vfio_user_send_locked(VFIOProxy *proxy, VFIOUserHdr *msg, + VFIOUserFDs *fds); +static void vfio_user_send(VFIOProxy *proxy, VFIOUserHdr *msg, + VFIOUserFDs *fds); /* @@ -36,6 +43,67 @@ static void vfio_user_shutdown(VFIOProxy *proxy); static void vfio_user_shutdown(VFIOProxy *proxy) { qio_channel_shutdown(proxy->ioc, QIO_CHANNEL_SHUTDOWN_READ, NULL); + qio_channel_set_aio_fd_handler(proxy->ioc, + iothread_get_aio_context(vfio_user_iothread), + NULL, NULL, NULL); +} + +static void vfio_user_send_locked(VFIOProxy *proxy, VFIOUserHdr *msg, + VFIOUserFDs *fds) +{ + struct iovec iov = { + .iov_base = msg, + .iov_len = msg->size, + }; + size_t numfds = 0; + int msgleft, ret, *fdp = NULL; + char *buf; + Error *local_err = NULL; + + if (proxy->state != VFIO_PROXY_CONNECTED) { + msg->flags |= VFIO_USER_ERROR; + msg->error_reply = ECONNRESET; + return; + } + + if (fds != NULL && fds->send_fds != 0) { + numfds = fds->send_fds; + fdp = fds->fds; + } + + ret = qio_channel_writev_full(proxy->ioc, &iov, 1, fdp, numfds, &local_err); + if (ret < 0) { + goto err; + } + if (ret == msg->size) { + return; + } + + buf = iov.iov_base + ret; + msgleft = iov.iov_len - ret; + do { + ret = qio_channel_write(proxy->ioc, buf, msgleft, &local_err); + if (ret < 0) { + goto err; + } + buf += ret; + msgleft -= ret; + } while (msgleft != 0); + return; + +err: + msg->flags |= VFIO_USER_ERROR; + msg->error_reply = EIO; + error_report_err(local_err); +} + +static void vfio_user_send(VFIOProxy *proxy, VFIOUserHdr *msg, + VFIOUserFDs *fds) +{ + + qemu_mutex_lock(&proxy->lock); + vfio_user_send_locked(proxy, msg, fds); + qemu_mutex_unlock(&proxy->lock); } @@ -43,6 +111,213 @@ static void vfio_user_shutdown(VFIOProxy *proxy) * Functions only called by iothread */ +void vfio_user_send_reply(VFIOProxy *proxy, char *buf, int ret) +{ + VFIOUserHdr *hdr = (VFIOUserHdr *)buf; + + /* + * convert header to associated reply + * positive ret is reply size, negative is error code + */ + hdr->flags = VFIO_USER_REPLY; + if (ret >= sizeof(VFIOUserHdr)) { + hdr->size = ret; + } else if (ret < 0) { + hdr->flags |= VFIO_USER_ERROR; + hdr->error_reply = -ret; + hdr->size = sizeof(*hdr); + } else { + error_printf("vfio_user_send_reply - size too small\n"); + return; + } + vfio_user_send(proxy, hdr, NULL); +} + +void vfio_user_recv(void *opaque) +{ + VFIODevice *vbasedev = opaque; + VFIOProxy *proxy = vbasedev->proxy; + VFIOUserReply *reply = NULL; + g_autofree int *fdp = NULL; + VFIOUserFDs reqfds = { 0, 0, fdp }; + VFIOUserHdr msg; + struct iovec iov = { + .iov_base = &msg, + .iov_len = sizeof(msg), + }; + bool isreply; + int i, ret; + size_t msgleft, numfds = 0; + char *data = NULL; + g_autofree char *buf = NULL; + Error *local_err = NULL; + + qemu_mutex_lock(&proxy->lock); + if (proxy->state == VFIO_PROXY_CLOSING) { + qemu_mutex_unlock(&proxy->lock); + return; + } + + ret = qio_channel_readv_full(proxy->ioc, &iov, 1, &fdp, &numfds, + &local_err); + if (ret <= 0) { + /* read error or other side closed connection */ + goto fatal; + } + + if (ret < sizeof(msg)) { + error_setg(&local_err, "vfio_user_recv short read of header"); + goto err; + } + if (msg.size < sizeof(VFIOUserHdr)) { + error_setg(&local_err, "vfio_user_recv bad header size"); + goto err; + } + + /* + * For replies, find the matching pending request + */ + switch (msg.flags & VFIO_USER_TYPE) { + case VFIO_USER_REQUEST: + isreply = 0; + break; + case VFIO_USER_REPLY: + isreply = 1; + break; + default: + error_setg(&local_err, "vfio_user_recv unknown message type"); + goto err; + } + + if (isreply) { + QTAILQ_FOREACH(reply, &proxy->pending, next) { + if (msg.id == reply->id) { + break; + } + } + if (reply == NULL) { + error_setg(&local_err, "vfio_user_recv unexpected reply"); + goto err; + } + QTAILQ_REMOVE(&proxy->pending, reply, next); + + /* + * Process any received FDs + */ + if (numfds != 0) { + if (reply->fds == NULL || reply->fds->recv_fds < numfds) { + error_setg(&local_err, "vfio_user_recv unexpected FDs"); + goto err; + } + reply->fds->recv_fds = numfds; + memcpy(reply->fds->fds, fdp, numfds * sizeof(int)); + } + + } else { + /* + * The client doesn't expect any FDs in requests, but + * they will be expected on the server + */ + if (numfds != 0 && (proxy->flags & VFIO_PROXY_CLIENT)) { + error_setg(&local_err, "vfio_user_recv fd in client reply"); + goto err; + } + reqfds.recv_fds = numfds; + } + + /* + * put the whole message into a single buffer + */ + if (isreply) { + if (msg.size > reply->rsize) { + error_setg(&local_err, + "vfio_user_recv reply larger than recv buffer"); + goto fatal; + } + *reply->msg = msg; + data = (char *)reply->msg + sizeof(msg); + } else { + if (msg.size > max_xfer_size) { + error_setg(&local_err, "vfio_user_recv request larger than max"); + goto fatal; + } + buf = g_malloc0(msg.size); + memcpy(buf, &msg, sizeof(msg)); + data = buf + sizeof(msg); + } + + msgleft = msg.size - sizeof(msg); + if (msgleft != 0) { + ret = qio_channel_read(proxy->ioc, data, msgleft, &local_err); + if (ret < 0) { + goto fatal; + } + if (ret != msgleft) { + error_setg(&local_err, "vfio_user_recv short read of msg body"); + goto err; + } + } + + /* + * Replies signal a waiter, requests get processed by vfio code + * that may assume the iothread lock is held. + */ + if (isreply) { + reply->complete = 1; + if (!reply->nowait) { + qemu_cond_signal(&reply->cv); + } else { + if (msg.flags & VFIO_USER_ERROR) { + error_printf("vfio_user_rcv error reply on async request "); + error_printf("command %x error %s\n", msg.command, + strerror(msg.error_reply)); + } + /* just free it if no one is waiting */ + reply->nowait = 0; + if (proxy->last_nowait == reply) { + proxy->last_nowait = NULL; + } + g_free(reply->msg); + QTAILQ_INSERT_HEAD(&proxy->free, reply, next); + } + qemu_mutex_unlock(&proxy->lock); + } else { + qemu_mutex_unlock(&proxy->lock); + qemu_mutex_lock_iothread(); + /* + * make sure proxy wasn't closed while we waited + * checking state without holding the proxy lock is safe + * since it's only set to CLOSING when BQL is held + */ + if (proxy->state != VFIO_PROXY_CLOSING) { + ret = proxy->request(proxy->reqarg, buf, &reqfds); + if (ret < 0 && !(msg.flags & VFIO_USER_NO_REPLY)) { + vfio_user_send_reply(proxy, buf, ret); + } + } + qemu_mutex_unlock_iothread(); + } + return; + +fatal: + vfio_user_shutdown(proxy); + proxy->state = VFIO_PROXY_RECV_ERROR; + +err: + for (i = 0; i < numfds; i++) { + close(fdp[i]); + } + if (reply != NULL) { + /* force an error to keep sending thread from hanging */ + reply->msg->flags |= VFIO_USER_ERROR; + reply->msg->error_reply = EINVAL; + reply->complete = 1; + qemu_cond_signal(&reply->cv); + } + qemu_mutex_unlock(&proxy->lock); + error_report_err(local_err); +} + static void vfio_user_cb(void *opaque) { VFIOProxy *proxy = opaque; @@ -101,6 +376,20 @@ VFIOProxy *vfio_user_connect_dev(SocketAddress *addr, Error **errp) return proxy; } +void vfio_user_set_reqhandler(VFIODevice *vbasedev, + int (*handler)(void *opaque, char *buf, + VFIOUserFDs *fds), + void *reqarg) +{ + VFIOProxy *proxy = vbasedev->proxy; + + proxy->request = handler; + proxy->reqarg = reqarg; + qio_channel_set_aio_fd_handler(proxy->ioc, + iothread_get_aio_context(vfio_user_iothread), + vfio_user_recv, NULL, vbasedev); +} + void vfio_user_disconnect(VFIOProxy *proxy) { VFIOUserReply *r1, *r2; diff --git a/MAINTAINERS b/MAINTAINERS index f429bab391..52d37dd088 100644 --- a/MAINTAINERS +++ b/MAINTAINERS @@ -1888,6 +1888,7 @@ S: Supported F: docs/devel/vfio-user.rst F: hw/vfio/user.c F: hw/vfio/user.h +F: hw/vfio/user-protocol.h vhost M: Michael S. Tsirkin <mst@redhat.com>