@@ -45,6 +45,7 @@ typedef struct {
/* Protected by ctx lock */
bool in_qio_channel_yield;
bool wait_idle;
+ bool quiescing;
VuDev vu_dev;
QIOChannel *ioc; /* The I/O channel with the client */
QIOChannelSocket *sioc; /* The underlying data channel with the client */
@@ -283,6 +283,7 @@ static void vu_blk_drained_begin(void *opaque)
{
VuBlkExport *vexp = opaque;
+ vexp->vu_server.quiescing = true;
vhost_user_server_detach_aio_context(&vexp->vu_server);
}
@@ -291,19 +292,23 @@ static void vu_blk_drained_end(void *opaque)
{
VuBlkExport *vexp = opaque;
+ vexp->vu_server.quiescing = false;
vhost_user_server_attach_aio_context(&vexp->vu_server, vexp->export.ctx);
}
/*
- * Ensures that bdrv_drained_begin() waits until in-flight requests complete.
+ * Ensures that bdrv_drained_begin() waits until in-flight requests complete
+ * and the server->co_trip coroutine has terminated. It will be restarted in
+ * vhost_user_server_attach_aio_context().
*
* Called with vexp->export.ctx acquired.
*/
static bool vu_blk_drained_poll(void *opaque)
{
VuBlkExport *vexp = opaque;
+ VuServer *server = &vexp->vu_server;
- return vhost_user_server_has_in_flight(&vexp->vu_server);
+ return server->co_trip || vhost_user_server_has_in_flight(server);
}
static const BlockDevOps vu_blk_dev_ops = {
@@ -132,8 +132,7 @@ vu_message_read(VuDev *vu_dev, int conn_fd, VhostUserMsg *vmsg)
qio_channel_yield(ioc, G_IO_IN);
server->in_qio_channel_yield = false;
} else {
- /* Wait until attached to an AioContext again */
- qemu_coroutine_yield();
+ return false;
}
continue;
} else {
@@ -201,8 +200,16 @@ static coroutine_fn void vu_client_trip(void *opaque)
VuServer *server = opaque;
VuDev *vu_dev = &server->vu_dev;
- while (!vu_dev->broken && vu_dispatch(vu_dev)) {
- /* Keep running */
+ while (!vu_dev->broken) {
+ if (server->quiescing) {
+ server->co_trip = NULL;
+ aio_wait_kick();
+ return;
+ }
+ /* vu_dispatch() returns false if server->ctx went away */
+ if (!vu_dispatch(vu_dev) && server->ctx) {
+ break;
+ }
}
if (vhost_user_server_has_in_flight(server)) {
@@ -353,8 +360,7 @@ static void vu_accept(QIONetListener *listener, QIOChannelSocket *sioc,
qio_channel_set_follow_coroutine_ctx(server->ioc, true);
- server->co_trip = qemu_coroutine_create(vu_client_trip, server);
-
+ /* Attaching the AioContext starts the vu_client_trip coroutine */
aio_context_acquire(server->ctx);
vhost_user_server_attach_aio_context(server, server->ctx);
aio_context_release(server->ctx);
@@ -413,8 +419,25 @@ void vhost_user_server_attach_aio_context(VuServer *server, AioContext *ctx)
NULL, NULL, vu_fd_watch);
}
- assert(!server->in_qio_channel_yield);
- aio_co_schedule(ctx, server->co_trip);
+ if (server->co_trip) {
+ /*
+ * The caller didn't fully shut down co_trip (this can happen on
+ * non-polling drains like in bdrv_graph_wrlock()). This is okay as long
+ * as it no longer tries to shut it down and we're guaranteed to still
+ * be in the same AioContext as before.
+ *
+ * co_ctx can still be NULL if we get multiple calls and only just
+ * scheduled a new coroutine in the else branch.
+ */
+ AioContext *co_ctx = qemu_coroutine_get_aio_context(server->co_trip);
+
+ assert(!server->quiescing);
+ assert(!co_ctx || co_ctx == ctx);
+ } else {
+ server->co_trip = qemu_coroutine_create(vu_client_trip, server);
+ assert(!server->in_qio_channel_yield);
+ aio_co_schedule(ctx, server->co_trip);
+ }
}
/* Called with server->ctx acquired */