@@ -71,7 +71,7 @@ static void nbd_reply_ready(void *opaque)
* that another thread has done the same thing in parallel, so
* the socket is not readable anymore.
*/
- ret = nbd_receive_reply(s->sioc->fd, &s->reply);
+ ret = nbd_receive_reply(s->ioc, &s->reply);
if (ret == -EAGAIN) {
return;
}
@@ -122,6 +122,7 @@ static int nbd_co_send_request(BlockDriverState *bs,
}
}
+ g_assert(qemu_in_coroutine());
assert(i < MAX_NBD_REQUESTS);
request->handle = INDEX_TO_HANDLE(s, i);
s->send_coroutine = qemu_coroutine_self();
@@ -131,17 +132,17 @@ static int nbd_co_send_request(BlockDriverState *bs,
nbd_reply_ready, nbd_restart_write, bs);
if (qiov) {
qio_channel_set_cork(s->ioc, true);
- rc = nbd_send_request(s->sioc->fd, request);
+ rc = nbd_send_request(s->ioc, request);
if (rc >= 0) {
- ret = qemu_co_sendv(s->sioc->fd, qiov->iov, qiov->niov,
- offset, request->len);
+ ret = nbd_wr_syncv(s->ioc, qiov->iov, qiov->niov,
+ offset, request->len, 0);
if (ret != request->len) {
rc = -EIO;
}
}
qio_channel_set_cork(s->ioc, false);
} else {
- rc = nbd_send_request(s->sioc->fd, request);
+ rc = nbd_send_request(s->ioc, request);
}
aio_set_fd_handler(aio_context, s->sioc->fd, false,
nbd_reply_ready, NULL, bs);
@@ -164,8 +165,8 @@ static void nbd_co_receive_reply(NbdClientSession *s,
reply->error = EIO;
} else {
if (qiov && reply->error == 0) {
- ret = qemu_co_recvv(s->sioc->fd, qiov->iov, qiov->niov,
- offset, request->len);
+ ret = nbd_wr_syncv(s->ioc, qiov->iov, qiov->niov,
+ offset, request->len, 1);
if (ret != request->len) {
reply->error = EIO;
}
@@ -372,7 +373,7 @@ void nbd_client_close(BlockDriverState *bs)
return;
}
- nbd_send_request(client->sioc->fd, &request);
+ nbd_send_request(client->ioc, &request);
nbd_teardown_connection(bs);
}
@@ -387,7 +388,7 @@ int nbd_client_init(BlockDriverState *bs, QIOChannelSocket *sioc,
logout("session init %s\n", export);
qio_channel_set_blocking(QIO_CHANNEL(sioc), true, NULL);
- ret = nbd_receive_negotiate(sioc->fd, export,
+ ret = nbd_receive_negotiate(QIO_CHANNEL(sioc), export,
&client->nbdflags, &client->size, errp);
if (ret < 0) {
logout("Failed to negotiate with the NBD server\n");
@@ -26,7 +26,6 @@ static gboolean nbd_accept(QIOChannel *ioc, GIOCondition condition,
gpointer opaque)
{
QIOChannelSocket *cioc;
- int fd;
cioc = qio_channel_socket_accept(QIO_CHANNEL_SOCKET(ioc),
NULL);
@@ -34,10 +33,7 @@ static gboolean nbd_accept(QIOChannel *ioc, GIOCondition condition,
return TRUE;
}
- fd = dup(cioc->fd);
- if (fd >= 0) {
- nbd_client_new(NULL, fd, nbd_client_put);
- }
+ nbd_client_new(NULL, cioc, nbd_client_put);
object_unref(OBJECT(cioc));
return TRUE;
}
@@ -23,6 +23,7 @@
#include "qemu-common.h"
#include "qemu/option.h"
+#include "io/channel-socket.h"
struct nbd_request {
uint32_t magic;
@@ -73,12 +74,17 @@ enum {
/* Maximum size of a single READ/WRITE data buffer */
#define NBD_MAX_BUFFER_SIZE (32 * 1024 * 1024)
-ssize_t nbd_wr_sync(int fd, void *buffer, size_t size, bool do_read);
-int nbd_receive_negotiate(int csock, const char *name, uint32_t *flags,
+ssize_t nbd_wr_syncv(QIOChannel *ioc,
+ struct iovec *iov,
+ size_t niov,
+ size_t offset,
+ size_t length,
+ bool do_read);
+int nbd_receive_negotiate(QIOChannel *ioc, const char *name, uint32_t *flags,
off_t *size, Error **errp);
-int nbd_init(int fd, int csock, uint32_t flags, off_t size);
-ssize_t nbd_send_request(int csock, struct nbd_request *request);
-ssize_t nbd_receive_reply(int csock, struct nbd_reply *reply);
+int nbd_init(int fd, QIOChannelSocket *sioc, uint32_t flags, off_t size);
+ssize_t nbd_send_request(QIOChannel *ioc, struct nbd_request *request);
+ssize_t nbd_receive_reply(QIOChannel *ioc, struct nbd_reply *reply);
int nbd_client(int fd);
int nbd_disconnect(int fd);
@@ -98,7 +104,9 @@ NBDExport *nbd_export_find(const char *name);
void nbd_export_set_name(NBDExport *exp, const char *name);
void nbd_export_close_all(void);
-void nbd_client_new(NBDExport *exp, int csock, void (*close_fn)(NBDClient *));
+void nbd_client_new(NBDExport *exp,
+ QIOChannelSocket *sioc,
+ void (*close)(NBDClient *));
void nbd_client_get(NBDClient *client);
void nbd_client_put(NBDClient *client);
@@ -70,7 +70,7 @@ static QTAILQ_HEAD(, NBDExport) exports = QTAILQ_HEAD_INITIALIZER(exports);
*/
-int nbd_receive_negotiate(int csock, const char *name, uint32_t *flags,
+int nbd_receive_negotiate(QIOChannel *ioc, const char *name, uint32_t *flags,
off_t *size, Error **errp)
{
char buf[256];
@@ -82,7 +82,7 @@ int nbd_receive_negotiate(int csock, const char *name, uint32_t *flags,
rc = -EINVAL;
- if (read_sync(csock, buf, 8) != 8) {
+ if (read_sync(ioc, buf, 8) != 8) {
error_setg(errp, "Failed to read data");
goto fail;
}
@@ -108,7 +108,7 @@ int nbd_receive_negotiate(int csock, const char *name, uint32_t *flags,
goto fail;
}
- if (read_sync(csock, &magic, sizeof(magic)) != sizeof(magic)) {
+ if (read_sync(ioc, &magic, sizeof(magic)) != sizeof(magic)) {
error_setg(errp, "Failed to read magic");
goto fail;
}
@@ -129,35 +129,35 @@ int nbd_receive_negotiate(int csock, const char *name, uint32_t *flags,
}
goto fail;
}
- if (read_sync(csock, &tmp, sizeof(tmp)) != sizeof(tmp)) {
+ if (read_sync(ioc, &tmp, sizeof(tmp)) != sizeof(tmp)) {
error_setg(errp, "Failed to read server flags");
goto fail;
}
*flags = be16_to_cpu(tmp) << 16;
/* reserved for future use */
- if (write_sync(csock, &reserved, sizeof(reserved)) !=
+ if (write_sync(ioc, &reserved, sizeof(reserved)) !=
sizeof(reserved)) {
error_setg(errp, "Failed to read reserved field");
goto fail;
}
/* write the export name */
magic = cpu_to_be64(magic);
- if (write_sync(csock, &magic, sizeof(magic)) != sizeof(magic)) {
+ if (write_sync(ioc, &magic, sizeof(magic)) != sizeof(magic)) {
error_setg(errp, "Failed to send export name magic");
goto fail;
}
opt = cpu_to_be32(NBD_OPT_EXPORT_NAME);
- if (write_sync(csock, &opt, sizeof(opt)) != sizeof(opt)) {
+ if (write_sync(ioc, &opt, sizeof(opt)) != sizeof(opt)) {
error_setg(errp, "Failed to send export name option number");
goto fail;
}
namesize = cpu_to_be32(strlen(name));
- if (write_sync(csock, &namesize, sizeof(namesize)) !=
+ if (write_sync(ioc, &namesize, sizeof(namesize)) !=
sizeof(namesize)) {
error_setg(errp, "Failed to send export name length");
goto fail;
}
- if (write_sync(csock, (char*)name, strlen(name)) != strlen(name)) {
+ if (write_sync(ioc, (char *)name, strlen(name)) != strlen(name)) {
error_setg(errp, "Failed to send export name");
goto fail;
}
@@ -174,7 +174,7 @@ int nbd_receive_negotiate(int csock, const char *name, uint32_t *flags,
}
}
- if (read_sync(csock, &s, sizeof(s)) != sizeof(s)) {
+ if (read_sync(ioc, &s, sizeof(s)) != sizeof(s)) {
error_setg(errp, "Failed to read export length");
goto fail;
}
@@ -182,19 +182,19 @@ int nbd_receive_negotiate(int csock, const char *name, uint32_t *flags,
TRACE("Size is %" PRIu64, *size);
if (!name) {
- if (read_sync(csock, flags, sizeof(*flags)) != sizeof(*flags)) {
+ if (read_sync(ioc, flags, sizeof(*flags)) != sizeof(*flags)) {
error_setg(errp, "Failed to read export flags");
goto fail;
}
*flags = be32_to_cpup(flags);
} else {
- if (read_sync(csock, &tmp, sizeof(tmp)) != sizeof(tmp)) {
+ if (read_sync(ioc, &tmp, sizeof(tmp)) != sizeof(tmp)) {
error_setg(errp, "Failed to read export flags");
goto fail;
}
*flags |= be16_to_cpu(tmp);
}
- if (read_sync(csock, &buf, 124) != 124) {
+ if (read_sync(ioc, &buf, 124) != 124) {
error_setg(errp, "Failed to read reserved block");
goto fail;
}
@@ -205,11 +205,11 @@ fail:
}
#ifdef __linux__
-int nbd_init(int fd, int csock, uint32_t flags, off_t size)
+int nbd_init(int fd, QIOChannelSocket *sioc, uint32_t flags, off_t size)
{
TRACE("Setting NBD socket");
- if (ioctl(fd, NBD_SET_SOCK, csock) < 0) {
+ if (ioctl(fd, NBD_SET_SOCK, sioc->fd) < 0) {
int serrno = errno;
LOG("Failed to set NBD socket");
return -serrno;
@@ -282,7 +282,7 @@ int nbd_client(int fd)
return ret;
}
#else
-int nbd_init(int fd, int csock, uint32_t flags, off_t size)
+int nbd_init(int fd, QIOChannelSocket *ioc, uint32_t flags, off_t size)
{
return -ENOTSUP;
}
@@ -293,7 +293,7 @@ int nbd_client(int fd)
}
#endif
-ssize_t nbd_send_request(int csock, struct nbd_request *request)
+ssize_t nbd_send_request(QIOChannel *ioc, struct nbd_request *request)
{
uint8_t buf[NBD_REQUEST_SIZE];
ssize_t ret;
@@ -308,7 +308,7 @@ ssize_t nbd_send_request(int csock, struct nbd_request *request)
"{ .from = %" PRIu64", .len = %u, .handle = %" PRIu64", .type=%i}",
request->from, request->len, request->handle, request->type);
- ret = write_sync(csock, buf, sizeof(buf));
+ ret = write_sync(ioc, buf, sizeof(buf));
if (ret < 0) {
return ret;
}
@@ -320,13 +320,13 @@ ssize_t nbd_send_request(int csock, struct nbd_request *request)
return 0;
}
-ssize_t nbd_receive_reply(int csock, struct nbd_reply *reply)
+ssize_t nbd_receive_reply(QIOChannel *ioc, struct nbd_reply *reply)
{
uint8_t buf[NBD_REPLY_SIZE];
uint32_t magic;
ssize_t ret;
- ret = read_sync(csock, buf, sizeof(buf));
+ ret = read_sync(ioc, buf, sizeof(buf));
if (ret < 0) {
return ret;
}
@@ -18,47 +18,107 @@
#include "nbd-internal.h"
-ssize_t nbd_wr_sync(int fd, void *buffer, size_t size, bool do_read)
+/**
+ * @skip: number of bytes to advance head of @iov
+ * @iov: pointer to iov array, updated on success
+ * @niov: number of elements in @iov, updated on success
+ * @oldiov: pointer single element to hold old info from @iov
+ *
+ * This will update @iov so that its head is advanced
+ * by @skip bytes. To do this, zero or more complete
+ * elements of @iov will be skipped over. The new head
+ * of @iov will then have its base & len updated to
+ * skip the remaining number of bytes. @oldiov will
+ * hold the original data from the new head of @iov.
+ */
+static void nbd_skip_iov(size_t skip,
+ struct iovec **iov,
+ int *niov,
+ struct iovec *oldiov)
{
- size_t offset = 0;
- int err;
-
- if (qemu_in_coroutine()) {
- if (do_read) {
- return qemu_co_recv(fd, buffer, size);
+ ssize_t done = 0;
+ size_t i;
+ for (i = 0; i < *niov; i++) {
+ if ((*iov)[i].iov_len <= skip) {
+ done += (*iov)[i].iov_len;
+ skip -= (*iov)[i].iov_len;
} else {
- return qemu_co_send(fd, buffer, size);
+ done += skip;
+ oldiov->iov_base = (*iov)[i].iov_base;
+ oldiov->iov_len = (*iov)[i].iov_len;
+ (*iov)[i].iov_len -= skip;
+ (*iov)[i].iov_base += skip;
+ *iov = *iov + i;
+ *niov = *niov - i;
+ break;
}
}
+}
+
+ssize_t nbd_wr_syncv(QIOChannel *ioc,
+ struct iovec *iov,
+ size_t niov,
+ size_t offset,
+ size_t length,
+ bool do_read)
+{
+ ssize_t done = 0;
+ size_t iovlen = iov_size(iov, niov);
+ struct iovec oldiov = { NULL, 0 };
+ Error *local_err = NULL;
+
+ /* Shouldn't happen but just to be sure */
+ if (offset > iovlen) {
+ return -EINVAL;
+ }
+ iovlen -= offset;
+ if (length > iovlen) {
+ return -EINVAL;
+ }
- while (offset < size) {
+ while (done < length) {
ssize_t len;
+ struct iovec *cur = iov;
+ int curcnt = niov;
+
+ nbd_skip_iov(offset + done, &cur, &curcnt, &oldiov);
if (do_read) {
- len = qemu_recv(fd, buffer + offset, size - offset, 0);
+ len = qio_channel_readv(ioc, cur, curcnt, &local_err);
} else {
- len = send(fd, buffer + offset, size - offset, 0);
+ len = qio_channel_writev(ioc, cur, curcnt, &local_err);
}
-
- if (len < 0) {
- err = socket_error();
-
- /* recoverable error */
- if (err == EINTR || (offset > 0 && (err == EAGAIN || err == EWOULDBLOCK))) {
- continue;
+ if (oldiov.iov_base) {
+ /* Restore original caller's info in @iov */
+ cur[0].iov_base = oldiov.iov_base;
+ cur[0].iov_len = oldiov.iov_len;
+ oldiov.iov_base = NULL;
+ oldiov.iov_len = 0;
+ }
+ if (len == QIO_CHANNEL_ERR_BLOCK) {
+ if (qemu_in_coroutine()) {
+ /* XXX see about using qio_channel_yield() in
+ * future if we can use normal watches instead
+ * of the AIO handlers */
+ qemu_coroutine_yield();
+ } else {
+ qio_channel_wait(ioc,
+ do_read ? G_IO_IN : G_IO_OUT);
}
-
- /* unrecoverable error */
- return -err;
+ continue;
+ }
+ if (len < 0) {
+ TRACE("I/O error: %s", error_get_pretty(local_err));
+ error_free(local_err);
+ /* XXX handle Error objects */
+ return -EIO;
}
- /* eof */
- if (len == 0) {
+ if (do_read && len == 0) {
break;
}
- offset += len;
+ done += len;
}
-
- return offset;
+ return done;
}
@@ -13,6 +13,7 @@
#include "sysemu/block-backend.h"
#include "qemu/coroutine.h"
+#include "qemu/iov.h"
#include <errno.h>
#include <string.h>
@@ -29,7 +30,6 @@
#include <linux/fs.h>
#endif
-#include "qemu/sockets.h"
#include "qemu/queue.h"
#include "qemu/main-loop.h"
@@ -90,24 +90,22 @@
#define NBD_EINVAL 22
#define NBD_ENOSPC 28
-static inline ssize_t read_sync(int fd, void *buffer, size_t size)
+static inline ssize_t read_sync(QIOChannel *ioc, void *buffer, size_t size)
{
+ struct iovec iov = { .iov_base = buffer, .iov_len = size };
/* Sockets are kept in blocking mode in the negotiation phase. After
* that, a non-readable socket simply means that another thread stole
* our request/reply. Synchronization is done with recv_coroutine, so
* that this is coroutine-safe.
*/
- return nbd_wr_sync(fd, buffer, size, true);
+ return nbd_wr_syncv(ioc, &iov, 1, 0, size, true);
}
-static inline ssize_t write_sync(int fd, void *buffer, size_t size)
+static inline ssize_t write_sync(QIOChannel *ioc, void *buffer, size_t size)
{
- int ret;
- do {
- /* For writes, we do expect the socket to be writable. */
- ret = nbd_wr_sync(fd, buffer, size, false);
- } while (ret == -EAGAIN);
- return ret;
+ struct iovec iov = { .iov_base = buffer, .iov_len = size };
+
+ return nbd_wr_syncv(ioc, &iov, 1, 0, size, false);
}
#endif
@@ -73,7 +73,8 @@ struct NBDClient {
void (*close)(NBDClient *client);
NBDExport *exp;
- int sock;
+ QIOChannelSocket *sioc; /* The underlying data channel */
+ QIOChannel *ioc; /* The current I/O channel which may differ (eg TLS) */
Coroutine *recv_coroutine;
@@ -93,45 +94,56 @@ static void nbd_set_handlers(NBDClient *client);
static void nbd_unset_handlers(NBDClient *client);
static void nbd_update_can_read(NBDClient *client);
-static void nbd_negotiate_continue(void *opaque)
+static gboolean nbd_negotiate_continue(QIOChannel *ioc,
+ GIOCondition condition,
+ void *opaque)
{
qemu_coroutine_enter(opaque, NULL);
+ return TRUE;
}
-static ssize_t nbd_negotiate_read(int fd, void *buffer, size_t size)
+static ssize_t nbd_negotiate_read(QIOChannel *ioc, void *buffer, size_t size)
{
ssize_t ret;
+ guint watch;
assert(qemu_in_coroutine());
/* Negotiation are always in main loop. */
- qemu_set_fd_handler(fd, nbd_negotiate_continue, NULL,
- qemu_coroutine_self());
- ret = read_sync(fd, buffer, size);
- qemu_set_fd_handler(fd, NULL, NULL, NULL);
+ watch = qio_channel_add_watch(ioc,
+ G_IO_IN,
+ nbd_negotiate_continue,
+ qemu_coroutine_self(),
+ NULL);
+ ret = read_sync(ioc, buffer, size);
+ g_source_remove(watch);
return ret;
}
-static ssize_t nbd_negotiate_write(int fd, void *buffer, size_t size)
+static ssize_t nbd_negotiate_write(QIOChannel *ioc, void *buffer, size_t size)
{
ssize_t ret;
+ guint watch;
assert(qemu_in_coroutine());
/* Negotiation are always in main loop. */
- qemu_set_fd_handler(fd, NULL, nbd_negotiate_continue,
- qemu_coroutine_self());
- ret = write_sync(fd, buffer, size);
- qemu_set_fd_handler(fd, NULL, NULL, NULL);
+ watch = qio_channel_add_watch(ioc,
+ G_IO_OUT,
+ nbd_negotiate_continue,
+ qemu_coroutine_self(),
+ NULL);
+ ret = write_sync(ioc, buffer, size);
+ g_source_remove(watch);
return ret;
}
-static ssize_t nbd_negotiate_drop_sync(int fd, size_t size)
+static ssize_t nbd_negotiate_drop_sync(QIOChannel *ioc, size_t size)
{
ssize_t ret, dropped = size;
uint8_t *buffer = g_malloc(MIN(65536, size));
while (size > 0) {
- ret = nbd_negotiate_read(fd, buffer, MIN(65536, size));
+ ret = nbd_negotiate_read(ioc, buffer, MIN(65536, size));
if (ret < 0) {
g_free(buffer);
return ret;
@@ -172,66 +184,66 @@ static ssize_t nbd_negotiate_drop_sync(int fd, size_t size)
*/
-static int nbd_negotiate_send_rep(int csock, uint32_t type, uint32_t opt)
+static int nbd_negotiate_send_rep(QIOChannel *ioc, uint32_t type, uint32_t opt)
{
uint64_t magic;
uint32_t len;
magic = cpu_to_be64(NBD_REP_MAGIC);
- if (nbd_negotiate_write(csock, &magic, sizeof(magic)) != sizeof(magic)) {
+ if (nbd_negotiate_write(ioc, &magic, sizeof(magic)) != sizeof(magic)) {
LOG("write failed (rep magic)");
return -EINVAL;
}
opt = cpu_to_be32(opt);
- if (nbd_negotiate_write(csock, &opt, sizeof(opt)) != sizeof(opt)) {
+ if (nbd_negotiate_write(ioc, &opt, sizeof(opt)) != sizeof(opt)) {
LOG("write failed (rep opt)");
return -EINVAL;
}
type = cpu_to_be32(type);
- if (nbd_negotiate_write(csock, &type, sizeof(type)) != sizeof(type)) {
+ if (nbd_negotiate_write(ioc, &type, sizeof(type)) != sizeof(type)) {
LOG("write failed (rep type)");
return -EINVAL;
}
len = cpu_to_be32(0);
- if (nbd_negotiate_write(csock, &len, sizeof(len)) != sizeof(len)) {
+ if (nbd_negotiate_write(ioc, &len, sizeof(len)) != sizeof(len)) {
LOG("write failed (rep data length)");
return -EINVAL;
}
return 0;
}
-static int nbd_negotiate_send_rep_list(int csock, NBDExport *exp)
+static int nbd_negotiate_send_rep_list(QIOChannel *ioc, NBDExport *exp)
{
uint64_t magic, name_len;
uint32_t opt, type, len;
name_len = strlen(exp->name);
magic = cpu_to_be64(NBD_REP_MAGIC);
- if (nbd_negotiate_write(csock, &magic, sizeof(magic)) != sizeof(magic)) {
+ if (nbd_negotiate_write(ioc, &magic, sizeof(magic)) != sizeof(magic)) {
LOG("write failed (magic)");
return -EINVAL;
}
opt = cpu_to_be32(NBD_OPT_LIST);
- if (nbd_negotiate_write(csock, &opt, sizeof(opt)) != sizeof(opt)) {
+ if (nbd_negotiate_write(ioc, &opt, sizeof(opt)) != sizeof(opt)) {
LOG("write failed (opt)");
return -EINVAL;
}
type = cpu_to_be32(NBD_REP_SERVER);
- if (nbd_negotiate_write(csock, &type, sizeof(type)) != sizeof(type)) {
+ if (nbd_negotiate_write(ioc, &type, sizeof(type)) != sizeof(type)) {
LOG("write failed (reply type)");
return -EINVAL;
}
len = cpu_to_be32(name_len + sizeof(len));
- if (nbd_negotiate_write(csock, &len, sizeof(len)) != sizeof(len)) {
+ if (nbd_negotiate_write(ioc, &len, sizeof(len)) != sizeof(len)) {
LOG("write failed (length)");
return -EINVAL;
}
len = cpu_to_be32(name_len);
- if (nbd_negotiate_write(csock, &len, sizeof(len)) != sizeof(len)) {
+ if (nbd_negotiate_write(ioc, &len, sizeof(len)) != sizeof(len)) {
LOG("write failed (length)");
return -EINVAL;
}
- if (nbd_negotiate_write(csock, exp->name, name_len) != name_len) {
+ if (nbd_negotiate_write(ioc, exp->name, name_len) != name_len) {
LOG("write failed (buffer)");
return -EINVAL;
}
@@ -240,30 +252,29 @@ static int nbd_negotiate_send_rep_list(int csock, NBDExport *exp)
static int nbd_negotiate_handle_list(NBDClient *client, uint32_t length)
{
- int csock;
NBDExport *exp;
- csock = client->sock;
if (length) {
- if (nbd_negotiate_drop_sync(csock, length) != length) {
+ if (nbd_negotiate_drop_sync(client->ioc, length) != length) {
return -EIO;
}
- return nbd_negotiate_send_rep(csock, NBD_REP_ERR_INVALID, NBD_OPT_LIST);
+ return nbd_negotiate_send_rep(client->ioc,
+ NBD_REP_ERR_INVALID, NBD_OPT_LIST);
}
/* For each export, send a NBD_REP_SERVER reply. */
QTAILQ_FOREACH(exp, &exports, next) {
- if (nbd_negotiate_send_rep_list(csock, exp)) {
+ if (nbd_negotiate_send_rep_list(client->ioc, exp)) {
return -EINVAL;
}
}
/* Finish with a NBD_REP_ACK. */
- return nbd_negotiate_send_rep(csock, NBD_REP_ACK, NBD_OPT_LIST);
+ return nbd_negotiate_send_rep(client->ioc, NBD_REP_ACK, NBD_OPT_LIST);
}
static int nbd_negotiate_handle_export_name(NBDClient *client, uint32_t length)
{
- int rc = -EINVAL, csock = client->sock;
+ int rc = -EINVAL;
char name[256];
/* Client sends:
@@ -274,7 +285,7 @@ static int nbd_negotiate_handle_export_name(NBDClient *client, uint32_t length)
LOG("Bad length received");
goto fail;
}
- if (nbd_negotiate_read(csock, name, length) != length) {
+ if (nbd_negotiate_read(client->ioc, name, length) != length) {
LOG("read failed");
goto fail;
}
@@ -295,7 +306,6 @@ fail:
static int nbd_negotiate_options(NBDClient *client)
{
- int csock = client->sock;
uint32_t flags;
/* Client sends:
@@ -312,7 +322,8 @@ static int nbd_negotiate_options(NBDClient *client)
... Rest of request
*/
- if (nbd_negotiate_read(csock, &flags, sizeof(flags)) != sizeof(flags)) {
+ if (nbd_negotiate_read(client->ioc, &flags, sizeof(flags)) !=
+ sizeof(flags)) {
LOG("read failed");
return -EIO;
}
@@ -328,7 +339,8 @@ static int nbd_negotiate_options(NBDClient *client)
uint32_t tmp, length;
uint64_t magic;
- if (nbd_negotiate_read(csock, &magic, sizeof(magic)) != sizeof(magic)) {
+ if (nbd_negotiate_read(client->ioc, &magic, sizeof(magic)) !=
+ sizeof(magic)) {
LOG("read failed");
return -EINVAL;
}
@@ -338,13 +350,13 @@ static int nbd_negotiate_options(NBDClient *client)
return -EINVAL;
}
- if (nbd_negotiate_read(csock, &tmp, sizeof(tmp)) != sizeof(tmp)) {
+ if (nbd_negotiate_read(client->ioc, &tmp, sizeof(tmp)) != sizeof(tmp)) {
LOG("read failed");
return -EINVAL;
}
- if (nbd_negotiate_read(csock, &length,
- sizeof(length)) != sizeof(length)) {
+ if (nbd_negotiate_read(client->ioc, &length, sizeof(length)) !=
+ sizeof(length)) {
LOG("read failed");
return -EINVAL;
}
@@ -368,7 +380,7 @@ static int nbd_negotiate_options(NBDClient *client)
default:
tmp = be32_to_cpu(tmp);
LOG("Unsupported option 0x%x", tmp);
- nbd_negotiate_send_rep(client->sock, NBD_REP_ERR_UNSUP, tmp);
+ nbd_negotiate_send_rep(client->ioc, NBD_REP_ERR_UNSUP, tmp);
return -EINVAL;
}
}
@@ -382,7 +394,6 @@ typedef struct {
static coroutine_fn int nbd_negotiate(NBDClientNewData *data)
{
NBDClient *client = data->client;
- int csock = client->sock;
char buf[8 + 8 + 8 + 128];
int rc;
const int myflags = (NBD_FLAG_HAS_FLAGS | NBD_FLAG_SEND_TRIM |
@@ -407,6 +418,7 @@ static coroutine_fn int nbd_negotiate(NBDClientNewData *data)
[28 .. 151] reserved (0)
*/
+ qio_channel_set_blocking(client->ioc, false, NULL);
rc = -EINVAL;
TRACE("Beginning negotiation.");
@@ -423,12 +435,12 @@ static coroutine_fn int nbd_negotiate(NBDClientNewData *data)
}
if (client->exp) {
- if (nbd_negotiate_write(csock, buf, sizeof(buf)) != sizeof(buf)) {
+ if (nbd_negotiate_write(client->ioc, buf, sizeof(buf)) != sizeof(buf)) {
LOG("write failed");
goto fail;
}
} else {
- if (nbd_negotiate_write(csock, buf, 18) != 18) {
+ if (nbd_negotiate_write(client->ioc, buf, 18) != 18) {
LOG("write failed");
goto fail;
}
@@ -441,8 +453,8 @@ static coroutine_fn int nbd_negotiate(NBDClientNewData *data)
assert ((client->exp->nbdflags & ~65535) == 0);
cpu_to_be64w((uint64_t*)(buf + 18), client->exp->size);
cpu_to_be16w((uint16_t*)(buf + 26), client->exp->nbdflags | myflags);
- if (nbd_negotiate_write(csock, buf + 18,
- sizeof(buf) - 18) != sizeof(buf) - 18) {
+ if (nbd_negotiate_write(client->ioc, buf + 18, sizeof(buf) - 18) !=
+ sizeof(buf) - 18) {
LOG("write failed");
goto fail;
}
@@ -472,13 +484,13 @@ int nbd_disconnect(int fd)
}
#endif
-static ssize_t nbd_receive_request(int csock, struct nbd_request *request)
+static ssize_t nbd_receive_request(QIOChannel *ioc, struct nbd_request *request)
{
uint8_t buf[NBD_REQUEST_SIZE];
uint32_t magic;
ssize_t ret;
- ret = read_sync(csock, buf, sizeof(buf));
+ ret = read_sync(ioc, buf, sizeof(buf));
if (ret < 0) {
return ret;
}
@@ -513,7 +525,7 @@ static ssize_t nbd_receive_request(int csock, struct nbd_request *request)
return 0;
}
-static ssize_t nbd_send_reply(int csock, struct nbd_reply *reply)
+static ssize_t nbd_send_reply(QIOChannel *ioc, struct nbd_reply *reply)
{
uint8_t buf[NBD_REPLY_SIZE];
ssize_t ret;
@@ -531,7 +543,7 @@ static ssize_t nbd_send_reply(int csock, struct nbd_reply *reply)
TRACE("Sending response to client");
- ret = write_sync(csock, buf, sizeof(buf));
+ ret = write_sync(ioc, buf, sizeof(buf));
if (ret < 0) {
return ret;
}
@@ -559,8 +571,8 @@ void nbd_client_put(NBDClient *client)
assert(client->closing);
nbd_unset_handlers(client);
- close(client->sock);
- client->sock = -1;
+ object_unref(OBJECT(client->sioc));
+ object_unref(OBJECT(client->ioc));
if (client->exp) {
QTAILQ_REMOVE(&client->exp->clients, client, next);
nbd_export_put(client->exp);
@@ -580,7 +592,8 @@ static void client_close(NBDClient *client)
/* Force requests to finish. They will drop their own references,
* then we'll close the socket and free the NBDClient.
*/
- shutdown(client->sock, 2);
+ qio_channel_shutdown(client->ioc, QIO_CHANNEL_SHUTDOWN_BOTH,
+ NULL);
/* Also tell the client, so that they release their reference. */
if (client->close) {
@@ -773,25 +786,25 @@ static ssize_t nbd_co_send_reply(NBDRequest *req, struct nbd_reply *reply,
int len)
{
NBDClient *client = req->client;
- int csock = client->sock;
ssize_t rc, ret;
+ g_assert(qemu_in_coroutine());
qemu_co_mutex_lock(&client->send_lock);
client->send_coroutine = qemu_coroutine_self();
nbd_set_handlers(client);
if (!len) {
- rc = nbd_send_reply(csock, reply);
+ rc = nbd_send_reply(client->ioc, reply);
} else {
- socket_set_cork(csock, 1);
- rc = nbd_send_reply(csock, reply);
+ qio_channel_set_cork(client->ioc, true);
+ rc = nbd_send_reply(client->ioc, reply);
if (rc >= 0) {
- ret = qemu_co_send(csock, req->data, len);
+ ret = write_sync(client->ioc, req->data, len);
if (ret != len) {
rc = -EIO;
}
}
- socket_set_cork(csock, 0);
+ qio_channel_set_cork(client->ioc, false);
}
client->send_coroutine = NULL;
@@ -803,14 +816,14 @@ static ssize_t nbd_co_send_reply(NBDRequest *req, struct nbd_reply *reply,
static ssize_t nbd_co_receive_request(NBDRequest *req, struct nbd_request *request)
{
NBDClient *client = req->client;
- int csock = client->sock;
uint32_t command;
ssize_t rc;
+ g_assert(qemu_in_coroutine());
client->recv_coroutine = qemu_coroutine_self();
nbd_update_can_read(client);
- rc = nbd_receive_request(csock, request);
+ rc = nbd_receive_request(client->ioc, request);
if (rc < 0) {
if (rc != -EAGAIN) {
rc = -EIO;
@@ -845,7 +858,7 @@ static ssize_t nbd_co_receive_request(NBDRequest *req, struct nbd_request *reque
if (command == NBD_CMD_WRITE) {
TRACE("Reading %u byte(s)", request->len);
- if (qemu_co_recv(csock, req->data, request->len) != request->len) {
+ if (read_sync(client->ioc, req->data, request->len) != request->len) {
LOG("reading from socket failed");
rc = -EIO;
goto out;
@@ -1040,7 +1053,7 @@ static void nbd_restart_write(void *opaque)
static void nbd_set_handlers(NBDClient *client)
{
if (client->exp && client->exp->ctx) {
- aio_set_fd_handler(client->exp->ctx, client->sock,
+ aio_set_fd_handler(client->exp->ctx, client->sioc->fd,
true,
client->can_read ? nbd_read : NULL,
client->send_coroutine ? nbd_restart_write : NULL,
@@ -1051,7 +1064,7 @@ static void nbd_set_handlers(NBDClient *client)
static void nbd_unset_handlers(NBDClient *client)
{
if (client->exp && client->exp->ctx) {
- aio_set_fd_handler(client->exp->ctx, client->sock,
+ aio_set_fd_handler(client->exp->ctx, client->sioc->fd,
true, NULL, NULL, NULL);
}
}
@@ -1093,7 +1106,9 @@ out:
g_free(data);
}
-void nbd_client_new(NBDExport *exp, int csock, void (*close_fn)(NBDClient *))
+void nbd_client_new(NBDExport *exp,
+ QIOChannelSocket *sioc,
+ void (*close_fn)(NBDClient *))
{
NBDClient *client;
NBDClientNewData *data = g_new(NBDClientNewData, 1);
@@ -1101,7 +1116,10 @@ void nbd_client_new(NBDExport *exp, int csock, void (*close_fn)(NBDClient *))
client = g_malloc0(sizeof(NBDClient));
client->refcount = 1;
client->exp = exp;
- client->sock = csock;
+ client->sioc = sioc;
+ object_ref(OBJECT(client->sioc));
+ client->ioc = QIO_CHANNEL(sioc);
+ object_ref(OBJECT(client->ioc));
client->can_read = true;
client->close = close_fn;
@@ -252,7 +252,7 @@ static void *nbd_client_thread(void *arg)
goto out;
}
- ret = nbd_receive_negotiate(sioc->fd, NULL, &nbdflags,
+ ret = nbd_receive_negotiate(QIO_CHANNEL(sioc), NULL, &nbdflags,
&size, &local_error);
if (ret < 0) {
if (local_error) {
@@ -268,7 +268,7 @@ static void *nbd_client_thread(void *arg)
goto out_socket;
}
- ret = nbd_init(fd, sioc->fd, nbdflags, size);
+ ret = nbd_init(fd, sioc, nbdflags, size);
if (ret < 0) {
goto out_fd;
}
@@ -328,7 +328,6 @@ static void nbd_client_closed(NBDClient *client)
static gboolean nbd_accept(QIOChannel *ioc, GIOCondition cond, gpointer opaque)
{
QIOChannelSocket *cioc;
- int fd;
cioc = qio_channel_socket_accept(QIO_CHANNEL_SOCKET(ioc),
NULL);
@@ -343,10 +342,7 @@ static gboolean nbd_accept(QIOChannel *ioc, GIOCondition cond, gpointer opaque)
nb_fds++;
nbd_update_server_watch();
- fd = dup(cioc->fd);
- if (fd >= 0) {
- nbd_client_new(exp, fd, nbd_client_closed);
- }
+ nbd_client_new(exp, cioc, nbd_client_closed);
object_unref(OBJECT(cioc));
return TRUE;
Now that all callers are converted to use I/O channels for initial connection setup, it is possible to switch the core NBD protocol handling core over to use QIOChannel APIs for actual sockets I/O. Signed-off-by: Daniel P. Berrange <berrange@redhat.com> --- block/nbd-client.c | 19 +++---- blockdev-nbd.c | 6 +-- include/block/nbd.h | 20 ++++--- nbd/client.c | 40 +++++++------- nbd/common.c | 112 ++++++++++++++++++++++++++++++--------- nbd/nbd-internal.h | 18 +++---- nbd/server.c | 150 +++++++++++++++++++++++++++++----------------------- qemu-nbd.c | 10 ++-- 8 files changed, 226 insertions(+), 149 deletions(-)