diff mbox series

[v7,07/21] multi-process: add co-routines to communicate with remote

Message ID d7013c71ba09fb73bc61a725088bebd91f0fda30.1593273671.git.elena.ufimtseva@oracle.com (mailing list archive)
State New, archived
Headers show
Series Initial support for multi-process qemu | expand

Commit Message

Elena Ufimtseva June 27, 2020, 5:09 p.m. UTC
From: Elena Ufimtseva <elena.ufimtseva@oracle.com>

process to avoid blocking the main loop during the message exchanges.
To be used by proxy device.

Signed-off-by: Elena Ufimtseva <elena.ufimtseva@oracle.com>
Signed-off-by: Jagannathan Raman <jag.raman@oracle.com>
Signed-off-by: John G Johnson <john.g.johnson@oracle.com>
---
 include/io/mpqemu-link.h | 16 +++++++++
 io/mpqemu-link.c         | 78 ++++++++++++++++++++++++++++++++++++++++
 2 files changed, 94 insertions(+)

Comments

Stefan Hajnoczi June 30, 2020, 6:31 p.m. UTC | #1
On Sat, Jun 27, 2020 at 10:09:29AM -0700, elena.ufimtseva@oracle.com wrote:
> From: Elena Ufimtseva <elena.ufimtseva@oracle.com>
> 
> process to avoid blocking the main loop during the message exchanges.
> To be used by proxy device.

The nested aio_poll() in this patch could be a problem. I've highlighted
it here so that others skimming through this series can join the
discussion without reading all my review comments:

  qemu_coroutine_enter(req.co);
  while (!req.finished) {
      aio_poll(qemu_get_aio_context(), false);

This is called from the proxy device, which means the vCPU thread. The
QEMU global mutex is held here.

aio_poll() does not release the mutex, so other vCPU threads will not be
able to run QEMU code until mpqemu_msg_send_reply_co() completes and the
QEMU global mutex has been released again.

Our goal is to do blocking socket I/O here since our vCPU thread cannot
make progress until the remote device replies. This means we can forget
about QEMU's event loop and simply do blocking I/O from the vCPU thread.
The QEMU global mutex can be dropped around the send/recv but a
per-socket mutex is needed to protect against multiple vCPU threads
interleaving requests.

I think this boils down to:

  /*
   * Blocking version of send+recv for the proxy device. Called from
   * vCPU thread where we cannot make progress until the remote device
   * has replied. Releases and re-acquires the QEMU global mutex.
   */
  uint64_t mpqemu_msg_send_and_await_reply(MPQemuMsg *msg, QIOChannel *ioc,
                                           Error **errp)
  {
      uint64_t ret;

      qemu_mutex_unlock_iothread();

      qemu_mutex_lock(&mplink->io_mutex);
      blocking_send(ioc, msg);
      ret = blocking_recv(ioc);
      qemu_mutex_unlock(&mplink->io_mutex);

      qemu_mutex_lock_iothread();

      return ret;
  }

This is very simple! The tricky thing is combining this approach with
mplink I/O that happens in the event loop. The heartbeat is one example.
I don't remember the protocol details enough to know how the other
message exchanges work. Code running in the event loop is not allowed to
block, so it really needs to use coroutines.

> 
> Signed-off-by: Elena Ufimtseva <elena.ufimtseva@oracle.com>
> Signed-off-by: Jagannathan Raman <jag.raman@oracle.com>
> Signed-off-by: John G Johnson <john.g.johnson@oracle.com>
> ---
>  include/io/mpqemu-link.h | 16 +++++++++
>  io/mpqemu-link.c         | 78 ++++++++++++++++++++++++++++++++++++++++
>  2 files changed, 94 insertions(+)
> 
> diff --git a/include/io/mpqemu-link.h b/include/io/mpqemu-link.h
> index 1542e8ed07..52aa89656c 100644
> --- a/include/io/mpqemu-link.h
> +++ b/include/io/mpqemu-link.h
> @@ -17,6 +17,7 @@
>  #include "qom/object.h"
>  #include "qemu/thread.h"
>  #include "io/channel.h"
> +#include "io/channel-socket.h"
>  
>  #define REMOTE_MAX_FDS 8
>  
> @@ -30,6 +31,7 @@
>   */
>  typedef enum {
>      INIT = 0,
> +    RET_MSG,
>      MAX = INT_MAX,
>  } MPQemuCmd;
>  
> @@ -67,6 +69,20 @@ typedef struct {
>      uint8_t *data2;
>  } MPQemuMsg;
>  
> +struct MPQemuRequest {
> +    MPQemuMsg *msg;
> +    QIOChannelSocket *sioc;
> +    Coroutine *co;
> +    bool finished;
> +    int error;
> +    long ret;
> +};
> +
> +typedef struct MPQemuRequest MPQemuRequest;
> +
> +uint64_t mpqemu_msg_send_reply_co(MPQemuMsg *msg, QIOChannel *ioc,

This function sends a message and waits for the reply. The name confuses
me, I would guess that this function sends a reply. Perhaps
mpqemu_msg_send_and_await_reply() is a clearer name.

> +                                  Error **errp);

Functions with "co" in their name run in coroutine context. This
function does not. Please remove the _co from the name to avoid
confusion.

> +
>  void mpqemu_msg_send(MPQemuMsg *msg, QIOChannel *ioc);
>  int mpqemu_msg_recv(MPQemuMsg *msg, QIOChannel *ioc);
>  
> diff --git a/io/mpqemu-link.c b/io/mpqemu-link.c
> index bfc542b5fd..c430b4d6a2 100644
> --- a/io/mpqemu-link.c
> +++ b/io/mpqemu-link.c
> @@ -16,6 +16,8 @@
>  #include "qapi/error.h"
>  #include "qemu/iov.h"
>  #include "qemu/error-report.h"
> +#include "qemu/main-loop.h"
> +#include "io/channel-socket.h"
>  
>  void mpqemu_msg_send(MPQemuMsg *msg, QIOChannel *ioc)
>  {
> @@ -118,6 +120,82 @@ int mpqemu_msg_recv(MPQemuMsg *msg, QIOChannel *ioc)
>      return 0;
>  }
>  
> +/* Use in proxy only as it clobbers fd handlers. */
> +static void coroutine_fn mpqemu_msg_send_co(void *data)
> +{
> +    MPQemuRequest *req = (MPQemuRequest *)data;
> +    MPQemuMsg msg_reply = {0};
> +    long ret = -EINVAL;
> +
> +    if (!req->sioc) {
> +        error_report("No channel available to send command %d",
> +                     req->msg->cmd);

Can this ever happen?

> +        atomic_mb_set(&req->finished, true);

Why is atomic_mb_set() used here?

> +        req->error = -EINVAL;
> +        return;
> +    }
> +
> +    req->co = qemu_coroutine_self();
> +    mpqemu_msg_send(req->msg, QIO_CHANNEL(req->sioc));
> +
> +    yield_until_fd_readable(req->sioc->fd);
> +
> +    ret = mpqemu_msg_recv(&msg_reply, QIO_CHANNEL(req->sioc));

mpqemu_msg_recv() already yields when called from a coroutine. Why is
yield_until_fd_readable() called explicitly here?

After removing the yield_until_fd_readable() it will also be possible to
use QIOChannel instead of the more specific QIOChannelSocket. That will
make the code a little simpler because the QIO_CHANNEL() casts can be
removed.

> +    if (ret < 0) {
> +        error_report("ERROR: failed to get a reply for command %d, \
> +                     errno %s, ret is %ld",

C doesn't support backslash string wrapping in a useful way. The
backslash tells the preprocessor to wrap the line but the string will
contain the leading whitespace.

This format string is equivalent to:

  "ERROR: failed to get a reply for command %d,                    errno %s, ret is %ld"

A common way of wrapping strings is:

  error_report("ERROR: failed to get a reply for command %d, "
               "errno %s, ret is %ld",

> +                     req->msg->cmd, strerror(errno), ret);
> +        req->error = -errno;

s/errno/ret/

> +    } else {
> +        if (!mpqemu_msg_valid(&msg_reply) || msg_reply.cmd != RET_MSG) {
> +            error_report("ERROR: Invalid reply received for command %d",
> +                         req->msg->cmd);
> +            req->error = -EINVAL;
> +        } else {
> +            req->ret = msg_reply.data1.u64;
> +        }
> +    }
> +    atomic_mb_set(&req->finished, true);
> +}
> +
> +/*
> + * Create if needed and enter co-routine to send the message to the
> + * remote channel ioc and wait for the reply.
> + * Resturns the value from the reply message, sets the error on failure.

s/Resturns/Returns/

> + */
> +
> +uint64_t mpqemu_msg_send_reply_co(MPQemuMsg *msg, QIOChannel *ioc,
> +                                  Error **errp)
> +{
> +    MPQemuRequest req = {0};
> +    uint64_t ret = UINT64_MAX;
> +
> +    req.sioc = QIO_CHANNEL_SOCKET(ioc);
> +    if (!req.sioc) {
> +        return ret;

The doc comment says "sets the error on failure" but this error return
does not set errp.

> +    }
> +
> +    req.msg = msg;
> +    req.ret = 0;
> +    req.finished = false;
> +
> +    if (!req.co) {

req.co is always NULL here because it's a local variable initialized to
0 at the beginning of this function.

> +        req.co = qemu_coroutine_create(mpqemu_msg_send_co, &req);
> +    }
> +
> +    qemu_coroutine_enter(req.co);
> +    while (!req.finished) {
> +        aio_poll(qemu_get_aio_context(), false);

The second argument must be true so that aio_poll() waits for activity
instead of returning immediately. If it returns immediately then this
loop will consume 100% CPU.

> +    }
> +    if (req.error) {
> +        error_setg(errp, "Error exchanging message with remote process, "\
> +                        "socket %d, error %d", req.sioc->fd, req.error);
> +    }
> +    ret = req.ret;
> +
> +    return ret;
> +}
> +
>  bool mpqemu_msg_valid(MPQemuMsg *msg)
>  {
>      if (msg->cmd >= MAX && msg->cmd < 0) {
> -- 
> 2.25.GIT
>
diff mbox series

Patch

diff --git a/include/io/mpqemu-link.h b/include/io/mpqemu-link.h
index 1542e8ed07..52aa89656c 100644
--- a/include/io/mpqemu-link.h
+++ b/include/io/mpqemu-link.h
@@ -17,6 +17,7 @@ 
 #include "qom/object.h"
 #include "qemu/thread.h"
 #include "io/channel.h"
+#include "io/channel-socket.h"
 
 #define REMOTE_MAX_FDS 8
 
@@ -30,6 +31,7 @@ 
  */
 typedef enum {
     INIT = 0,
+    RET_MSG,
     MAX = INT_MAX,
 } MPQemuCmd;
 
@@ -67,6 +69,20 @@  typedef struct {
     uint8_t *data2;
 } MPQemuMsg;
 
+struct MPQemuRequest {
+    MPQemuMsg *msg;
+    QIOChannelSocket *sioc;
+    Coroutine *co;
+    bool finished;
+    int error;
+    long ret;
+};
+
+typedef struct MPQemuRequest MPQemuRequest;
+
+uint64_t mpqemu_msg_send_reply_co(MPQemuMsg *msg, QIOChannel *ioc,
+                                  Error **errp);
+
 void mpqemu_msg_send(MPQemuMsg *msg, QIOChannel *ioc);
 int mpqemu_msg_recv(MPQemuMsg *msg, QIOChannel *ioc);
 
diff --git a/io/mpqemu-link.c b/io/mpqemu-link.c
index bfc542b5fd..c430b4d6a2 100644
--- a/io/mpqemu-link.c
+++ b/io/mpqemu-link.c
@@ -16,6 +16,8 @@ 
 #include "qapi/error.h"
 #include "qemu/iov.h"
 #include "qemu/error-report.h"
+#include "qemu/main-loop.h"
+#include "io/channel-socket.h"
 
 void mpqemu_msg_send(MPQemuMsg *msg, QIOChannel *ioc)
 {
@@ -118,6 +120,82 @@  int mpqemu_msg_recv(MPQemuMsg *msg, QIOChannel *ioc)
     return 0;
 }
 
+/* Use in proxy only as it clobbers fd handlers. */
+static void coroutine_fn mpqemu_msg_send_co(void *data)
+{
+    MPQemuRequest *req = (MPQemuRequest *)data;
+    MPQemuMsg msg_reply = {0};
+    long ret = -EINVAL;
+
+    if (!req->sioc) {
+        error_report("No channel available to send command %d",
+                     req->msg->cmd);
+        atomic_mb_set(&req->finished, true);
+        req->error = -EINVAL;
+        return;
+    }
+
+    req->co = qemu_coroutine_self();
+    mpqemu_msg_send(req->msg, QIO_CHANNEL(req->sioc));
+
+    yield_until_fd_readable(req->sioc->fd);
+
+    ret = mpqemu_msg_recv(&msg_reply, QIO_CHANNEL(req->sioc));
+    if (ret < 0) {
+        error_report("ERROR: failed to get a reply for command %d, \
+                     errno %s, ret is %ld",
+                     req->msg->cmd, strerror(errno), ret);
+        req->error = -errno;
+    } else {
+        if (!mpqemu_msg_valid(&msg_reply) || msg_reply.cmd != RET_MSG) {
+            error_report("ERROR: Invalid reply received for command %d",
+                         req->msg->cmd);
+            req->error = -EINVAL;
+        } else {
+            req->ret = msg_reply.data1.u64;
+        }
+    }
+    atomic_mb_set(&req->finished, true);
+}
+
+/*
+ * Create if needed and enter co-routine to send the message to the
+ * remote channel ioc and wait for the reply.
+ * Resturns the value from the reply message, sets the error on failure.
+ */
+
+uint64_t mpqemu_msg_send_reply_co(MPQemuMsg *msg, QIOChannel *ioc,
+                                  Error **errp)
+{
+    MPQemuRequest req = {0};
+    uint64_t ret = UINT64_MAX;
+
+    req.sioc = QIO_CHANNEL_SOCKET(ioc);
+    if (!req.sioc) {
+        return ret;
+    }
+
+    req.msg = msg;
+    req.ret = 0;
+    req.finished = false;
+
+    if (!req.co) {
+        req.co = qemu_coroutine_create(mpqemu_msg_send_co, &req);
+    }
+
+    qemu_coroutine_enter(req.co);
+    while (!req.finished) {
+        aio_poll(qemu_get_aio_context(), false);
+    }
+    if (req.error) {
+        error_setg(errp, "Error exchanging message with remote process, "\
+                        "socket %d, error %d", req.sioc->fd, req.error);
+    }
+    ret = req.ret;
+
+    return ret;
+}
+
 bool mpqemu_msg_valid(MPQemuMsg *msg)
 {
     if (msg->cmd >= MAX && msg->cmd < 0) {