@@ -93,13 +93,45 @@ static void nbd_set_handlers(NBDClient *client);
static void nbd_unset_handlers(NBDClient *client);
static void nbd_update_can_read(NBDClient *client);
-static ssize_t drop_sync(int fd, size_t size)
+static void nbd_negotiate_continue(void *opaque)
+{
+ qemu_coroutine_enter(opaque, NULL);
+}
+
+static ssize_t nbd_negotiate_read(int fd, void *buffer, size_t size)
+{
+ ssize_t ret;
+
+ assert(qemu_in_coroutine());
+ /* Negotiation are always in main loop. */
+ qemu_set_fd_handler(fd, nbd_negotiate_continue, NULL,
+ qemu_coroutine_self());
+ ret = read_sync(fd, buffer, size);
+ qemu_set_fd_handler(fd, NULL, NULL, NULL);
+ return ret;
+
+}
+
+static ssize_t nbd_negotiate_write(int fd, void *buffer, size_t size)
+{
+ ssize_t ret;
+
+ assert(qemu_in_coroutine());
+ /* Negotiation are always in main loop. */
+ qemu_set_fd_handler(fd, NULL, nbd_negotiate_continue,
+ qemu_coroutine_self());
+ ret = write_sync(fd, buffer, size);
+ qemu_set_fd_handler(fd, NULL, NULL, NULL);
+ return ret;
+}
+
+static ssize_t nbd_negotiate_drop_sync(int fd, size_t size)
{
ssize_t ret, dropped = size;
uint8_t *buffer = g_malloc(MIN(65536, size));
while (size > 0) {
- ret = read_sync(fd, buffer, MIN(65536, size));
+ ret = nbd_negotiate_read(fd, buffer, MIN(65536, size));
if (ret < 0) {
g_free(buffer);
return ret;
@@ -140,96 +172,96 @@ static ssize_t drop_sync(int fd, size_t size)
*/
-static int nbd_send_rep(int csock, uint32_t type, uint32_t opt)
+static int nbd_negotiate_send_rep(int csock, uint32_t type, uint32_t opt)
{
uint64_t magic;
uint32_t len;
magic = cpu_to_be64(NBD_REP_MAGIC);
- if (write_sync(csock, &magic, sizeof(magic)) != sizeof(magic)) {
+ if (nbd_negotiate_write(csock, &magic, sizeof(magic)) != sizeof(magic)) {
LOG("write failed (rep magic)");
return -EINVAL;
}
opt = cpu_to_be32(opt);
- if (write_sync(csock, &opt, sizeof(opt)) != sizeof(opt)) {
+ if (nbd_negotiate_write(csock, &opt, sizeof(opt)) != sizeof(opt)) {
LOG("write failed (rep opt)");
return -EINVAL;
}
type = cpu_to_be32(type);
- if (write_sync(csock, &type, sizeof(type)) != sizeof(type)) {
+ if (nbd_negotiate_write(csock, &type, sizeof(type)) != sizeof(type)) {
LOG("write failed (rep type)");
return -EINVAL;
}
len = cpu_to_be32(0);
- if (write_sync(csock, &len, sizeof(len)) != sizeof(len)) {
+ if (nbd_negotiate_write(csock, &len, sizeof(len)) != sizeof(len)) {
LOG("write failed (rep data length)");
return -EINVAL;
}
return 0;
}
-static int nbd_send_rep_list(int csock, NBDExport *exp)
+static int nbd_negotiate_send_rep_list(int csock, NBDExport *exp)
{
uint64_t magic, name_len;
uint32_t opt, type, len;
name_len = strlen(exp->name);
magic = cpu_to_be64(NBD_REP_MAGIC);
- if (write_sync(csock, &magic, sizeof(magic)) != sizeof(magic)) {
+ if (nbd_negotiate_write(csock, &magic, sizeof(magic)) != sizeof(magic)) {
LOG("write failed (magic)");
return -EINVAL;
}
opt = cpu_to_be32(NBD_OPT_LIST);
- if (write_sync(csock, &opt, sizeof(opt)) != sizeof(opt)) {
+ if (nbd_negotiate_write(csock, &opt, sizeof(opt)) != sizeof(opt)) {
LOG("write failed (opt)");
return -EINVAL;
}
type = cpu_to_be32(NBD_REP_SERVER);
- if (write_sync(csock, &type, sizeof(type)) != sizeof(type)) {
+ if (nbd_negotiate_write(csock, &type, sizeof(type)) != sizeof(type)) {
LOG("write failed (reply type)");
return -EINVAL;
}
len = cpu_to_be32(name_len + sizeof(len));
- if (write_sync(csock, &len, sizeof(len)) != sizeof(len)) {
+ if (nbd_negotiate_write(csock, &len, sizeof(len)) != sizeof(len)) {
LOG("write failed (length)");
return -EINVAL;
}
len = cpu_to_be32(name_len);
- if (write_sync(csock, &len, sizeof(len)) != sizeof(len)) {
+ if (nbd_negotiate_write(csock, &len, sizeof(len)) != sizeof(len)) {
LOG("write failed (length)");
return -EINVAL;
}
- if (write_sync(csock, exp->name, name_len) != name_len) {
+ if (nbd_negotiate_write(csock, exp->name, name_len) != name_len) {
LOG("write failed (buffer)");
return -EINVAL;
}
return 0;
}
-static int nbd_handle_list(NBDClient *client, uint32_t length)
+static int nbd_negotiate_handle_list(NBDClient *client, uint32_t length)
{
int csock;
NBDExport *exp;
csock = client->sock;
if (length) {
- if (drop_sync(csock, length) != length) {
+ if (nbd_negotiate_drop_sync(csock, length) != length) {
return -EIO;
}
- return nbd_send_rep(csock, NBD_REP_ERR_INVALID, NBD_OPT_LIST);
+ return nbd_negotiate_send_rep(csock, NBD_REP_ERR_INVALID, NBD_OPT_LIST);
}
/* For each export, send a NBD_REP_SERVER reply. */
QTAILQ_FOREACH(exp, &exports, next) {
- if (nbd_send_rep_list(csock, exp)) {
+ if (nbd_negotiate_send_rep_list(csock, exp)) {
return -EINVAL;
}
}
/* Finish with a NBD_REP_ACK. */
- return nbd_send_rep(csock, NBD_REP_ACK, NBD_OPT_LIST);
+ return nbd_negotiate_send_rep(csock, NBD_REP_ACK, NBD_OPT_LIST);
}
-static int nbd_handle_export_name(NBDClient *client, uint32_t length)
+static int nbd_negotiate_handle_export_name(NBDClient *client, uint32_t length)
{
int rc = -EINVAL, csock = client->sock;
char name[256];
@@ -242,7 +274,7 @@ static int nbd_handle_export_name(NBDClient *client, uint32_t length)
LOG("Bad length received");
goto fail;
}
- if (read_sync(csock, name, length) != length) {
+ if (nbd_negotiate_read(csock, name, length) != length) {
LOG("read failed");
goto fail;
}
@@ -261,7 +293,7 @@ fail:
return rc;
}
-static int nbd_receive_options(NBDClient *client)
+static int nbd_negotiate_options(NBDClient *client)
{
int csock = client->sock;
uint32_t flags;
@@ -280,7 +312,7 @@ static int nbd_receive_options(NBDClient *client)
... Rest of request
*/
- if (read_sync(csock, &flags, sizeof(flags)) != sizeof(flags)) {
+ if (nbd_negotiate_read(csock, &flags, sizeof(flags)) != sizeof(flags)) {
LOG("read failed");
return -EIO;
}
@@ -296,7 +328,7 @@ static int nbd_receive_options(NBDClient *client)
uint32_t tmp, length;
uint64_t magic;
- if (read_sync(csock, &magic, sizeof(magic)) != sizeof(magic)) {
+ if (nbd_negotiate_read(csock, &magic, sizeof(magic)) != sizeof(magic)) {
LOG("read failed");
return -EINVAL;
}
@@ -306,12 +338,13 @@ static int nbd_receive_options(NBDClient *client)
return -EINVAL;
}
- if (read_sync(csock, &tmp, sizeof(tmp)) != sizeof(tmp)) {
+ if (nbd_negotiate_read(csock, &tmp, sizeof(tmp)) != sizeof(tmp)) {
LOG("read failed");
return -EINVAL;
}
- if (read_sync(csock, &length, sizeof(length)) != sizeof(length)) {
+ if (nbd_negotiate_read(csock, &length,
+ sizeof(length)) != sizeof(length)) {
LOG("read failed");
return -EINVAL;
}
@@ -320,7 +353,7 @@ static int nbd_receive_options(NBDClient *client)
TRACE("Checking option");
switch (be32_to_cpu(tmp)) {
case NBD_OPT_LIST:
- ret = nbd_handle_list(client, length);
+ ret = nbd_negotiate_handle_list(client, length);
if (ret < 0) {
return ret;
}
@@ -330,19 +363,25 @@ static int nbd_receive_options(NBDClient *client)
return -EINVAL;
case NBD_OPT_EXPORT_NAME:
- return nbd_handle_export_name(client, length);
+ return nbd_negotiate_handle_export_name(client, length);
default:
tmp = be32_to_cpu(tmp);
LOG("Unsupported option 0x%x", tmp);
- nbd_send_rep(client->sock, NBD_REP_ERR_UNSUP, tmp);
+ nbd_negotiate_send_rep(client->sock, NBD_REP_ERR_UNSUP, tmp);
return -EINVAL;
}
}
}
-static int nbd_send_negotiate(NBDClient *client)
+typedef struct {
+ NBDClient *client;
+ Coroutine *co;
+} NBDClientNewData;
+
+static coroutine_fn int nbd_negotiate(NBDClientNewData *data)
{
+ NBDClient *client = data->client;
int csock = client->sock;
char buf[8 + 8 + 8 + 128];
int rc;
@@ -368,7 +407,6 @@ static int nbd_send_negotiate(NBDClient *client)
[28 .. 151] reserved (0)
*/
- qemu_set_block(csock);
rc = -EINVAL;
TRACE("Beginning negotiation.");
@@ -385,16 +423,16 @@ static int nbd_send_negotiate(NBDClient *client)
}
if (client->exp) {
- if (write_sync(csock, buf, sizeof(buf)) != sizeof(buf)) {
+ if (nbd_negotiate_write(csock, buf, sizeof(buf)) != sizeof(buf)) {
LOG("write failed");
goto fail;
}
} else {
- if (write_sync(csock, buf, 18) != 18) {
+ if (nbd_negotiate_write(csock, buf, 18) != 18) {
LOG("write failed");
goto fail;
}
- rc = nbd_receive_options(client);
+ rc = nbd_negotiate_options(client);
if (rc != 0) {
LOG("option negotiation failed");
goto fail;
@@ -403,7 +441,8 @@ static int nbd_send_negotiate(NBDClient *client)
assert ((client->exp->nbdflags & ~65535) == 0);
cpu_to_be64w((uint64_t*)(buf + 18), client->exp->size);
cpu_to_be16w((uint16_t*)(buf + 26), client->exp->nbdflags | myflags);
- if (write_sync(csock, buf + 18, sizeof(buf) - 18) != sizeof(buf) - 18) {
+ if (nbd_negotiate_write(csock, buf + 18,
+ sizeof(buf) - 18) != sizeof(buf) - 18) {
LOG("write failed");
goto fail;
}
@@ -412,7 +451,6 @@ static int nbd_send_negotiate(NBDClient *client)
TRACE("Negotiation succeeded.");
rc = 0;
fail:
- qemu_set_nonblock(csock);
return rc;
}
@@ -1028,25 +1066,43 @@ static void nbd_update_can_read(NBDClient *client)
}
}
+static coroutine_fn void nbd_co_client_start(void *opaque)
+{
+ NBDClientNewData *data = opaque;
+ NBDClient *client = data->client;
+ NBDExport *exp = client->exp;
+
+ if (exp) {
+ nbd_export_get(exp);
+ }
+ if (nbd_negotiate(data)) {
+ shutdown(client->sock, 2);
+ client->close(client);
+ goto out;
+ }
+ qemu_co_mutex_init(&client->send_lock);
+ nbd_set_handlers(client);
+
+ if (exp) {
+ QTAILQ_INSERT_TAIL(&exp->clients, client, next);
+ }
+out:
+ g_free(data);
+}
+
void nbd_client_new(NBDExport *exp, int csock, void (*close_fn)(NBDClient *))
{
NBDClient *client;
+ NBDClientNewData *data = g_new(NBDClientNewData, 1);
+
client = g_malloc0(sizeof(NBDClient));
client->refcount = 1;
client->exp = exp;
client->sock = csock;
client->can_read = true;
- if (nbd_send_negotiate(client)) {
- shutdown(client->sock, 2);
- close_fn(client);
- return;
- }
client->close = close_fn;
- qemu_co_mutex_init(&client->send_lock);
- nbd_set_handlers(client);
- if (exp) {
- QTAILQ_INSERT_TAIL(&exp->clients, client, next);
- nbd_export_get(exp);
- }
+ data->client = client;
+ data->co = qemu_coroutine_create(nbd_co_client_start);
+ qemu_coroutine_enter(data->co, data);
}