@@ -132,6 +132,9 @@ struct NBDClient {
CoMutex send_lock;
Coroutine *send_coroutine;
+ bool read_yielding;
+ bool quiescing;
+
QTAILQ_ENTRY(NBDClient) next;
int nb_requests;
bool closing;
@@ -1352,14 +1355,60 @@ static coroutine_fn int nbd_negotiate(NBDClient *client, Error **errp)
return 0;
}
-static int nbd_receive_request(QIOChannel *ioc, NBDRequest *request,
+/* nbd_read_eof
+ * Tries to read @size bytes from @ioc. This is a local implementation of
+ * qio_channel_readv_all_eof. We have it here because we need it to be
+ * interruptible and to know when the coroutine is yielding.
+ * Returns 1 on success
+ * 0 on eof, when no data was read (errp is not set)
+ * negative errno on failure (errp is set)
+ */
+static inline int coroutine_fn
+nbd_read_eof(NBDClient *client, void *buffer, size_t size, Error **errp)
+{
+ bool partial = false;
+
+ assert(size);
+ while (size > 0) {
+ struct iovec iov = { .iov_base = buffer, .iov_len = size };
+ ssize_t len;
+
+ len = qio_channel_readv(client->ioc, &iov, 1, errp);
+ if (len == QIO_CHANNEL_ERR_BLOCK) {
+ client->read_yielding = true;
+ qio_channel_yield(client->ioc, G_IO_IN);
+ client->read_yielding = false;
+ if (client->quiescing) {
+ return -EAGAIN;
+ }
+ continue;
+ } else if (len < 0) {
+ return -EIO;
+ } else if (len == 0) {
+ if (partial) {
+ error_setg(errp,
+ "Unexpected end-of-file before all bytes were read");
+ return -EIO;
+ } else {
+ return 0;
+ }
+ }
+
+ partial = true;
+ size -= len;
+ buffer = (uint8_t *) buffer + len;
+ }
+ return 1;
+}
+
+static int nbd_receive_request(NBDClient *client, NBDRequest *request,
Error **errp)
{
uint8_t buf[NBD_REQUEST_SIZE];
uint32_t magic;
int ret;
- ret = nbd_read(ioc, buf, sizeof(buf), "request", errp);
+ ret = nbd_read_eof(client, buf, sizeof(buf), errp);
if (ret < 0) {
return ret;
}
@@ -1480,11 +1529,37 @@ static void blk_aio_attached(AioContext *ctx, void *opaque)
QTAILQ_FOREACH(client, &exp->clients, next) {
qio_channel_attach_aio_context(client->ioc, ctx);
+
+ assert(client->recv_coroutine == NULL);
+ assert(client->send_coroutine == NULL);
+
+ if (client->quiescing) {
+ client->quiescing = false;
+ nbd_client_receive_next_request(client);
+ }
+ }
+}
+
+static void nbd_aio_detach_bh(void *opaque)
+{
+ NBDExport *exp = opaque;
+ NBDClient *client;
+
+ QTAILQ_FOREACH(client, &exp->clients, next) {
+ qio_channel_detach_aio_context(client->ioc);
+ client->quiescing = true;
+
if (client->recv_coroutine) {
- aio_co_schedule(ctx, client->recv_coroutine);
+ if (client->read_yielding) {
+ qemu_aio_coroutine_enter(exp->common.ctx,
+ client->recv_coroutine);
+ } else {
+ AIO_WAIT_WHILE(exp->common.ctx, client->recv_coroutine != NULL);
+ }
}
+
if (client->send_coroutine) {
- aio_co_schedule(ctx, client->send_coroutine);
+ AIO_WAIT_WHILE(exp->common.ctx, client->send_coroutine != NULL);
}
}
}
@@ -1492,13 +1567,10 @@ static void blk_aio_attached(AioContext *ctx, void *opaque)
static void blk_aio_detach(void *opaque)
{
NBDExport *exp = opaque;
- NBDClient *client;
trace_nbd_blk_aio_detach(exp->name, exp->common.ctx);
- QTAILQ_FOREACH(client, &exp->clients, next) {
- qio_channel_detach_aio_context(client->ioc);
- }
+ aio_wait_bh_oneshot(exp->common.ctx, nbd_aio_detach_bh, exp);
exp->common.ctx = NULL;
}
@@ -2151,20 +2223,23 @@ static int nbd_co_send_bitmap(NBDClient *client, uint64_t handle,
/* nbd_co_receive_request
* Collect a client request. Return 0 if request looks valid, -EIO to drop
- * connection right away, and any other negative value to report an error to
- * the client (although the caller may still need to disconnect after reporting
- * the error).
+ * connection right away, -EAGAIN to indicate we were interrupted and the
+ * channel should be quiesced, and any other negative value to report an error
+ * to the client (although the caller may still need to disconnect after
+ * reporting the error).
*/
static int nbd_co_receive_request(NBDRequestData *req, NBDRequest *request,
Error **errp)
{
NBDClient *client = req->client;
int valid_flags;
+ int ret;
g_assert(qemu_in_coroutine());
assert(client->recv_coroutine == qemu_coroutine_self());
- if (nbd_receive_request(client->ioc, request, errp) < 0) {
- return -EIO;
+ ret = nbd_receive_request(client, request, errp);
+ if (ret < 0) {
+ return ret;
}
trace_nbd_co_receive_request_decode_type(request->handle, request->type,
@@ -2507,6 +2582,17 @@ static coroutine_fn void nbd_trip(void *opaque)
return;
}
+ if (client->quiescing) {
+ /*
+ * We're switching between AIO contexts. Don't attempt to receive a new
+ * request and kick the main context which may be waiting for us.
+ */
+ nbd_client_put(client);
+ client->recv_coroutine = NULL;
+ aio_wait_kick();
+ return;
+ }
+
req = nbd_request_get(client);
ret = nbd_co_receive_request(req, &request, &local_err);
client->recv_coroutine = NULL;
@@ -2519,6 +2605,11 @@ static coroutine_fn void nbd_trip(void *opaque)
goto done;
}
+ if (ret == -EAGAIN) {
+ assert(client->quiescing);
+ goto done;
+ }
+
nbd_client_receive_next_request(client);
if (ret == -EIO) {
goto disconnect;
@@ -2565,7 +2656,8 @@ disconnect:
static void nbd_client_receive_next_request(NBDClient *client)
{
- if (!client->recv_coroutine && client->nb_requests < MAX_NBD_REQUESTS) {
+ if (!client->recv_coroutine && client->nb_requests < MAX_NBD_REQUESTS &&
+ !client->quiescing) {
nbd_client_get(client);
client->recv_coroutine = qemu_coroutine_create(nbd_trip, client);
aio_co_schedule(client->exp->common.ctx, client->recv_coroutine);