diff mbox series

[RESEND,v6,11/36] multi-process: define mpqemu-link object

Message ID 8ffca5a79b71ebe0f183707db34f59562faee71f.1587614626.git.elena.ufimtseva@oracle.com (mailing list archive)
State New, archived
Headers show
Series [RESEND,v6,01/36] memory: alloc RAM from file at offset | expand

Commit Message

Elena Ufimtseva April 23, 2020, 4:13 a.m. UTC
From: Jagannathan Raman <jag.raman@oracle.com>

Defines mpqemu-link object which forms the communication link between
QEMU & emulation program.
Adds functions to configure members of mpqemu-link object instance.
Adds functions to send and receive messages over the communication
channel.
Adds GMainLoop to handle events received on the communication channel.

Signed-off-by: Jagannathan Raman <jag.raman@oracle.com>
Signed-off-by: John G Johnson <john.g.johnson@oracle.com>
Signed-off-by: Elena Ufimtseva <elena.ufimtseva@oracle.com>
---
 MAINTAINERS              |   2 +
 include/io/mpqemu-link.h | 127 ++++++++++++++++
 io/Makefile.objs         |   2 +
 io/mpqemu-link.c         | 312 +++++++++++++++++++++++++++++++++++++++
 4 files changed, 443 insertions(+)
 create mode 100644 include/io/mpqemu-link.h
 create mode 100644 io/mpqemu-link.c

Comments

Stefan Hajnoczi May 12, 2020, 8:56 a.m. UTC | #1
On Wed, Apr 22, 2020 at 09:13:46PM -0700, elena.ufimtseva@oracle.com wrote:
> From: Jagannathan Raman <jag.raman@oracle.com>
> 
> Defines mpqemu-link object which forms the communication link between
> QEMU & emulation program.
> Adds functions to configure members of mpqemu-link object instance.
> Adds functions to send and receive messages over the communication
> channel.
> Adds GMainLoop to handle events received on the communication channel.
> 
> Signed-off-by: Jagannathan Raman <jag.raman@oracle.com>
> Signed-off-by: John G Johnson <john.g.johnson@oracle.com>
> Signed-off-by: Elena Ufimtseva <elena.ufimtseva@oracle.com>

This will change a lot when integrated into the QEMU event loop so I've
skipped a lot of the code.

QIOChannel is probably the appropriate object to use instead of directly
accessing a file descriptor.

> +/**
> + * mpqemu_cmd_t:
> + *
> + * proc_cmd_t enum type to specify the command to be executed on the remote
> + * device.
> + */
> +typedef enum {
> +    INIT = 0,
> +    MAX,
> +} mpqemu_cmd_t;
> +
> +/**
> + * MPQemuMsg:
> + * @cmd: The remote command
> + * @bytestream: Indicates if the data to be shared is structured (data1)
> + *              or unstructured (data2)
> + * @size: Size of the data to be shared
> + * @data1: Structured data
> + * @fds: File descriptors to be shared with remote device
> + * @data2: Unstructured data
> + *
> + * MPQemuMsg Format of the message sent to the remote device from QEMU.
> + *
> + */
> +typedef struct {
> +    mpqemu_cmd_t cmd;

Please use an int field on the wire because the C standard says:

  Each enumerated type shall be compatible with char, a signed integer
  type, or an unsigned integer type. The choice of type is
  implementation-defined, but shall be capable of representing the
  values of all the members of the enumeration.

So the compiler may make this a char field (which would introduce
padding before the bytestream field) but if a new enum constant FOO =
0x100 is added then the compiler might change the size to 16-bit.

> +int mpqemu_msg_recv(MPQemuMsg *msg, MPQemuChannel *chan)
> +{
> +    int rc;
> +    uint8_t *data;
> +    union {
> +        char control[CMSG_SPACE(REMOTE_MAX_FDS * sizeof(int))];
> +        struct cmsghdr align;
> +    } u;
> +    struct msghdr hdr;
> +    struct cmsghdr *chdr;
> +    size_t fdsize;
> +    int sock = chan->sock;
> +    QemuMutex *lock = &chan->recv_lock;
> +
> +    struct iovec iov = {
> +        .iov_base = (char *) msg,
> +        .iov_len = MPQEMU_MSG_HDR_SIZE,
> +    };
> +
> +    memset(&hdr, 0, sizeof(hdr));
> +    memset(&u, 0, sizeof(u));
> +
> +    hdr.msg_iov = &iov;
> +    hdr.msg_iovlen = 1;
> +    hdr.msg_control = &u;
> +    hdr.msg_controllen = sizeof(u);
> +
> +    WITH_QEMU_LOCK_GUARD(lock) {
> +        do {
> +            rc = recvmsg(sock, &hdr, 0);
> +        } while (rc < 0 && (errno == EINTR || errno == EAGAIN));
> +
> +        if (rc < 0) {

Missing rc != MPQEMU_MSG_HDR_SIZE check. If this was a short read we
should not attempt to parse uninitialized bytes in msg.

This is more defensive than relying on catching bogus input values later
on and also protects against accidentally revealing uninitialized memory
contents by observing our error handling response.

> +            qemu_log_mask(LOG_REMOTE_DEBUG, "%s - recvmsg rc is %d, "
> +                          "errno is %d, sock %d\n", __func__, rc, errno, sock);
> +            return rc;
> +        }
> +
> +        msg->num_fds = 0;
> +        for (chdr = CMSG_FIRSTHDR(&hdr); chdr != NULL;
> +             chdr = CMSG_NXTHDR(&hdr, chdr)) {
> +            if ((chdr->cmsg_level == SOL_SOCKET) &&
> +                (chdr->cmsg_type == SCM_RIGHTS)) {
> +                fdsize = chdr->cmsg_len - CMSG_LEN(0);
> +                msg->num_fds = fdsize / sizeof(int);
> +                if (msg->num_fds > REMOTE_MAX_FDS) {
> +                    qemu_log_mask(LOG_REMOTE_DEBUG,
> +                                  "%s: Max FDs exceeded\n", __func__);
> +                    return -ERANGE;
> +                }
> +
> +                memcpy(msg->fds, CMSG_DATA(chdr), fdsize);
> +                break;
> +            }
> +        }
> +
> +        if (msg->bytestream) {
> +            if (!msg->size) {
> +                qemu_mutex_unlock(lock);

Duplicate unlock, we're already inside WITH_QEMU_LOCK_GUARD().

> +                return -EINVAL;
> +            }
> +
> +            msg->data2 = calloc(1, msg->size);

What is the maximum message size? Please pick one and enforce it to
protect against huge allocations that cause us to run out of memory.

> +            data = msg->data2;
> +        } else {
> +            data = (uint8_t *)&msg->data1;

Adding a uint8_t member to the union eliminates the need for a cast:

  union {
      uint64_t u64;
      uint8_t u8;
  } data1;

  ...

  data = &msg->data1.u8;

> +        }
> +
> +        if (msg->size) {
> +            do {
> +                rc = read(sock, data, msg->size);
> +            } while (rc < 0 && (errno == EINTR || errno == EAGAIN));
> +        }

Short reads are an error. Please check that the sum of rc values is
equal to msg->size.

> +    }
> +    return rc;
> +}
...
> +bool mpqemu_msg_valid(MPQemuMsg *msg)
> +{
> +    if (msg->cmd >= MAX) {
> +        return false;
> +    }

Checking msg->cmd < 0 is also useful here, especially when the field
type is changed to int.
Jag Raman May 12, 2020, 12:09 p.m. UTC | #2
> On May 12, 2020, at 4:56 AM, Stefan Hajnoczi <stefanha@redhat.com> wrote:
> 
> On Wed, Apr 22, 2020 at 09:13:46PM -0700, elena.ufimtseva@oracle.com wrote:
>> From: Jagannathan Raman <jag.raman@oracle.com>
>> 
>> Defines mpqemu-link object which forms the communication link between
>> QEMU & emulation program.
>> Adds functions to configure members of mpqemu-link object instance.
>> Adds functions to send and receive messages over the communication
>> channel.
>> Adds GMainLoop to handle events received on the communication channel.
>> 
>> Signed-off-by: Jagannathan Raman <jag.raman@oracle.com>
>> Signed-off-by: John G Johnson <john.g.johnson@oracle.com>
>> Signed-off-by: Elena Ufimtseva <elena.ufimtseva@oracle.com>
> 
> This will change a lot when integrated into the QEMU event loop so I've
> skipped a lot of the code.
> 
> QIOChannel is probably the appropriate object to use instead of directly
> accessing a file descriptor.

OK, got it. Thanks!

> 
>> +/**
>> + * mpqemu_cmd_t:
>> + *
>> + * proc_cmd_t enum type to specify the command to be executed on the remote
>> + * device.
>> + */
>> +typedef enum {
>> +    INIT = 0,
>> +    MAX,
>> +} mpqemu_cmd_t;
>> +
>> +/**
>> + * MPQemuMsg:
>> + * @cmd: The remote command
>> + * @bytestream: Indicates if the data to be shared is structured (data1)
>> + *              or unstructured (data2)
>> + * @size: Size of the data to be shared
>> + * @data1: Structured data
>> + * @fds: File descriptors to be shared with remote device
>> + * @data2: Unstructured data
>> + *
>> + * MPQemuMsg Format of the message sent to the remote device from QEMU.
>> + *
>> + */
>> +typedef struct {
>> +    mpqemu_cmd_t cmd;
> 
> Please use an int field on the wire because the C standard says:
> 
>  Each enumerated type shall be compatible with char, a signed integer
>  type, or an unsigned integer type. The choice of type is
>  implementation-defined, but shall be capable of representing the
>  values of all the members of the enumeration.
> 
> So the compiler may make this a char field (which would introduce
> padding before the bytestream field) but if a new enum constant FOO =
> 0x100 is added then the compiler might change the size to 16-bit.
> 
>> +int mpqemu_msg_recv(MPQemuMsg *msg, MPQemuChannel *chan)
>> +{
>> +    int rc;
>> +    uint8_t *data;
>> +    union {
>> +        char control[CMSG_SPACE(REMOTE_MAX_FDS * sizeof(int))];
>> +        struct cmsghdr align;
>> +    } u;
>> +    struct msghdr hdr;
>> +    struct cmsghdr *chdr;
>> +    size_t fdsize;
>> +    int sock = chan->sock;
>> +    QemuMutex *lock = &chan->recv_lock;
>> +
>> +    struct iovec iov = {
>> +        .iov_base = (char *) msg,
>> +        .iov_len = MPQEMU_MSG_HDR_SIZE,
>> +    };
>> +
>> +    memset(&hdr, 0, sizeof(hdr));
>> +    memset(&u, 0, sizeof(u));
>> +
>> +    hdr.msg_iov = &iov;
>> +    hdr.msg_iovlen = 1;
>> +    hdr.msg_control = &u;
>> +    hdr.msg_controllen = sizeof(u);
>> +
>> +    WITH_QEMU_LOCK_GUARD(lock) {
>> +        do {
>> +            rc = recvmsg(sock, &hdr, 0);
>> +        } while (rc < 0 && (errno == EINTR || errno == EAGAIN));
>> +
>> +        if (rc < 0) {
> 
> Missing rc != MPQEMU_MSG_HDR_SIZE check. If this was a short read we
> should not attempt to parse uninitialized bytes in msg.
> 
> This is more defensive than relying on catching bogus input values later
> on and also protects against accidentally revealing uninitialized memory
> contents by observing our error handling response.
> 
>> +            qemu_log_mask(LOG_REMOTE_DEBUG, "%s - recvmsg rc is %d, "
>> +                          "errno is %d, sock %d\n", __func__, rc, errno, sock);
>> +            return rc;
>> +        }
>> +
>> +        msg->num_fds = 0;
>> +        for (chdr = CMSG_FIRSTHDR(&hdr); chdr != NULL;
>> +             chdr = CMSG_NXTHDR(&hdr, chdr)) {
>> +            if ((chdr->cmsg_level == SOL_SOCKET) &&
>> +                (chdr->cmsg_type == SCM_RIGHTS)) {
>> +                fdsize = chdr->cmsg_len - CMSG_LEN(0);
>> +                msg->num_fds = fdsize / sizeof(int);
>> +                if (msg->num_fds > REMOTE_MAX_FDS) {
>> +                    qemu_log_mask(LOG_REMOTE_DEBUG,
>> +                                  "%s: Max FDs exceeded\n", __func__);
>> +                    return -ERANGE;
>> +                }
>> +
>> +                memcpy(msg->fds, CMSG_DATA(chdr), fdsize);
>> +                break;
>> +            }
>> +        }
>> +
>> +        if (msg->bytestream) {
>> +            if (!msg->size) {
>> +                qemu_mutex_unlock(lock);
> 
> Duplicate unlock, we're already inside WITH_QEMU_LOCK_GUARD().
> 
>> +                return -EINVAL;
>> +            }
>> +
>> +            msg->data2 = calloc(1, msg->size);
> 
> What is the maximum message size? Please pick one and enforce it to
> protect against huge allocations that cause us to run out of memory.
> 
>> +            data = msg->data2;
>> +        } else {
>> +            data = (uint8_t *)&msg->data1;
> 
> Adding a uint8_t member to the union eliminates the need for a cast:
> 
>  union {
>      uint64_t u64;
>      uint8_t u8;
>  } data1;
> 
>  ...
> 
>  data = &msg->data1.u8;
> 
>> +        }
>> +
>> +        if (msg->size) {
>> +            do {
>> +                rc = read(sock, data, msg->size);
>> +            } while (rc < 0 && (errno == EINTR || errno == EAGAIN));
>> +        }
> 
> Short reads are an error. Please check that the sum of rc values is
> equal to msg->size.
> 
>> +    }
>> +    return rc;
>> +}
> ...
>> +bool mpqemu_msg_valid(MPQemuMsg *msg)
>> +{
>> +    if (msg->cmd >= MAX) {
>> +        return false;
>> +    }
> 
> Checking msg->cmd < 0 is also useful here, especially when the field
> type is changed to int.
diff mbox series

Patch

diff --git a/MAINTAINERS b/MAINTAINERS
index 965f34d4f9..93ad693da4 100644
--- a/MAINTAINERS
+++ b/MAINTAINERS
@@ -2857,6 +2857,8 @@  M: John G Johnson <john.g.johnson@oracle.com>
 S: Maintained
 F: remote/Makefile.objs
 F: remote/remote-main.c
+F: include/io/mpqemu-link.h
+F: io/mpqemu-link.c
 
 Build and test automation
 -------------------------
diff --git a/include/io/mpqemu-link.h b/include/io/mpqemu-link.h
new file mode 100644
index 0000000000..af401e640c
--- /dev/null
+++ b/include/io/mpqemu-link.h
@@ -0,0 +1,127 @@ 
+/*
+ * Communication channel between QEMU and remote device process
+ *
+ * Copyright © 2018, 2020 Oracle and/or its affiliates.
+ *
+ * This work is licensed under the terms of the GNU GPL, version 2 or later.
+ * See the COPYING file in the top-level directory.
+ *
+ */
+
+#ifndef MPQEMU_LINK_H
+#define MPQEMU_LINK_H
+
+#include "qemu/osdep.h"
+#include "qemu-common.h"
+
+#include "qom/object.h"
+#include "qemu/thread.h"
+
+#define TYPE_MPQEMU_LINK "mpqemu-link"
+#define MPQEMU_LINK(obj) \
+    OBJECT_CHECK(MPQemuLinkState, (obj), TYPE_MPQEMU_LINK)
+
+#define REMOTE_MAX_FDS 8
+
+#define MPQEMU_MSG_HDR_SIZE offsetof(MPQemuMsg, data1.u64)
+
+/**
+ * mpqemu_cmd_t:
+ *
+ * proc_cmd_t enum type to specify the command to be executed on the remote
+ * device.
+ */
+typedef enum {
+    INIT = 0,
+    MAX,
+} mpqemu_cmd_t;
+
+/**
+ * MPQemuMsg:
+ * @cmd: The remote command
+ * @bytestream: Indicates if the data to be shared is structured (data1)
+ *              or unstructured (data2)
+ * @size: Size of the data to be shared
+ * @data1: Structured data
+ * @fds: File descriptors to be shared with remote device
+ * @data2: Unstructured data
+ *
+ * MPQemuMsg Format of the message sent to the remote device from QEMU.
+ *
+ */
+typedef struct {
+    mpqemu_cmd_t cmd;
+    int bytestream;
+    size_t size;
+
+    union {
+        uint64_t u64;
+    } data1;
+
+    int fds[REMOTE_MAX_FDS];
+    int num_fds;
+
+    uint8_t *data2;
+} MPQemuMsg;
+
+/**
+ * MPQemuChannel:
+ * @gsrc: GSource object to be used by loop
+ * @gpfd: GPollFD object containing the socket & events to monitor
+ * @sock: Socket to send/receive communication, same as the one in gpfd
+ * @send_lock: Mutex to synchronize access to the send stream
+ * @recv_lock: Mutex to synchronize access to the recv stream
+ *
+ * Defines the channel that make up the communication link
+ * between QEMU and remote process
+ */
+
+typedef struct MPQemuChannel {
+    GSource gsrc;
+    GPollFD gpfd;
+    int sock;
+    QemuMutex send_lock;
+    QemuMutex recv_lock;
+} MPQemuChannel;
+
+typedef struct MPQemuLinkState MPQemuLinkState;
+
+typedef void (*mpqemu_link_callback)(GIOCondition cond, MPQemuLinkState *link,
+                                     MPQemuChannel *chan);
+
+/*
+ * MPQemuLinkState Instance info. of the communication
+ * link between QEMU and remote process. The Link could
+ * be made up of multiple channels.
+ *
+ * ctx        GMainContext to be used for communication
+ * loop       Main loop that would be used to poll for incoming data
+ * com        Communication channel to transport control messages
+ *
+ */
+
+struct MPQemuLinkState {
+    Object obj;
+
+    GMainContext *ctx;
+    GMainLoop *loop;
+
+    MPQemuChannel *com;
+
+    mpqemu_link_callback callback;
+};
+
+MPQemuLinkState *mpqemu_link_create(void);
+void mpqemu_link_finalize(MPQemuLinkState *s);
+
+void mpqemu_msg_send(MPQemuMsg *msg, MPQemuChannel *chan);
+int mpqemu_msg_recv(MPQemuMsg *msg, MPQemuChannel *chan);
+
+void mpqemu_init_channel(MPQemuLinkState *s, MPQemuChannel **chan, int fd);
+void mpqemu_destroy_channel(MPQemuChannel *chan);
+void mpqemu_link_set_callback(MPQemuLinkState *s,
+                              mpqemu_link_callback callback);
+void mpqemu_start_coms(MPQemuLinkState *s, MPQemuChannel* chan);
+bool mpqemu_msg_valid(MPQemuMsg *msg);
+
+#endif
diff --git a/io/Makefile.objs b/io/Makefile.objs
index 9a20fce4ed..5875ab0697 100644
--- a/io/Makefile.objs
+++ b/io/Makefile.objs
@@ -10,3 +10,5 @@  io-obj-y += channel-util.o
 io-obj-y += dns-resolver.o
 io-obj-y += net-listener.o
 io-obj-y += task.o
+
+io-obj-$(CONFIG_MPQEMU) += mpqemu-link.o
diff --git a/io/mpqemu-link.c b/io/mpqemu-link.c
new file mode 100644
index 0000000000..48f53a8928
--- /dev/null
+++ b/io/mpqemu-link.c
@@ -0,0 +1,312 @@ 
+/*
+ * Communication channel between QEMU and remote device process
+ *
+ * Copyright © 2018, 2020 Oracle and/or its affiliates.
+ *
+ * This work is licensed under the terms of the GNU GPL, version 2 or later.
+ * See the COPYING file in the top-level directory.
+ *
+ */
+
+#include "qemu/osdep.h"
+#include "qemu-common.h"
+
+#include "qemu/module.h"
+#include "io/mpqemu-link.h"
+#include "qemu/log.h"
+#include "qemu/lockable.h"
+
+GSourceFuncs gsrc_funcs;
+
+static void mpqemu_link_inst_init(Object *obj)
+{
+    MPQemuLinkState *s = MPQEMU_LINK(obj);
+
+    s->ctx = g_main_context_default();
+    s->loop = g_main_loop_new(s->ctx, FALSE);
+}
+
+static const TypeInfo mpqemu_link_info = {
+    .name = TYPE_MPQEMU_LINK,
+    .parent = TYPE_OBJECT,
+    .instance_size = sizeof(MPQemuLinkState),
+    .instance_init = mpqemu_link_inst_init,
+};
+
+static void mpqemu_link_register_types(void)
+{
+    type_register_static(&mpqemu_link_info);
+}
+
+type_init(mpqemu_link_register_types)
+
+MPQemuLinkState *mpqemu_link_create(void)
+{
+    MPQemuLinkState *link = MPQEMU_LINK(object_new(TYPE_MPQEMU_LINK));
+
+    link->com = NULL;
+
+    return link;
+}
+
+void mpqemu_link_finalize(MPQemuLinkState *s)
+{
+    g_main_loop_unref(s->loop);
+    g_main_context_unref(s->ctx);
+    g_main_loop_quit(s->loop);
+
+    mpqemu_destroy_channel(s->com);
+
+    object_unref(OBJECT(s));
+}
+
+void mpqemu_msg_send(MPQemuMsg *msg, MPQemuChannel *chan)
+{
+    int rc;
+    uint8_t *data;
+    union {
+        char control[CMSG_SPACE(REMOTE_MAX_FDS * sizeof(int))];
+        struct cmsghdr align;
+    } u;
+    struct msghdr hdr;
+    struct cmsghdr *chdr;
+    int sock = chan->sock;
+    QemuMutex *lock = &chan->send_lock;
+
+    struct iovec iov = {
+        .iov_base = (char *) msg,
+        .iov_len = MPQEMU_MSG_HDR_SIZE,
+    };
+
+    memset(&hdr, 0, sizeof(hdr));
+    memset(&u, 0, sizeof(u));
+
+    hdr.msg_iov = &iov;
+    hdr.msg_iovlen = 1;
+
+    if (msg->num_fds > REMOTE_MAX_FDS) {
+        qemu_log_mask(LOG_REMOTE_DEBUG, "%s: Max FDs exceeded\n", __func__);
+        return;
+    }
+
+    if (msg->num_fds > 0) {
+        size_t fdsize = msg->num_fds * sizeof(int);
+
+        hdr.msg_control = &u;
+        hdr.msg_controllen = sizeof(u);
+
+        chdr = CMSG_FIRSTHDR(&hdr);
+        chdr->cmsg_len = CMSG_LEN(fdsize);
+        chdr->cmsg_level = SOL_SOCKET;
+        chdr->cmsg_type = SCM_RIGHTS;
+        memcpy(CMSG_DATA(chdr), msg->fds, fdsize);
+        hdr.msg_controllen = CMSG_SPACE(fdsize);
+    }
+
+    WITH_QEMU_LOCK_GUARD(lock) {
+        do {
+            rc = sendmsg(sock, &hdr, 0);
+        } while (rc < 0 && (errno == EINTR || errno == EAGAIN));
+
+        if (rc < 0) {
+            qemu_log_mask(LOG_REMOTE_DEBUG, "%s - sendmsg rc is %d, "
+                          "errno is %d, sock %d\n", __func__, rc, errno, sock);
+            return;
+        }
+
+        if (msg->bytestream) {
+            data = msg->data2;
+        } else {
+            data = (uint8_t *)msg + MPQEMU_MSG_HDR_SIZE;
+        }
+
+        do {
+            rc = write(sock, data, msg->size);
+        } while (rc < 0 && (errno == EINTR || errno == EAGAIN));
+    }
+}
+
+
+int mpqemu_msg_recv(MPQemuMsg *msg, MPQemuChannel *chan)
+{
+    int rc;
+    uint8_t *data;
+    union {
+        char control[CMSG_SPACE(REMOTE_MAX_FDS * sizeof(int))];
+        struct cmsghdr align;
+    } u;
+    struct msghdr hdr;
+    struct cmsghdr *chdr;
+    size_t fdsize;
+    int sock = chan->sock;
+    QemuMutex *lock = &chan->recv_lock;
+
+    struct iovec iov = {
+        .iov_base = (char *) msg,
+        .iov_len = MPQEMU_MSG_HDR_SIZE,
+    };
+
+    memset(&hdr, 0, sizeof(hdr));
+    memset(&u, 0, sizeof(u));
+
+    hdr.msg_iov = &iov;
+    hdr.msg_iovlen = 1;
+    hdr.msg_control = &u;
+    hdr.msg_controllen = sizeof(u);
+
+    WITH_QEMU_LOCK_GUARD(lock) {
+        do {
+            rc = recvmsg(sock, &hdr, 0);
+        } while (rc < 0 && (errno == EINTR || errno == EAGAIN));
+
+        if (rc < 0) {
+            qemu_log_mask(LOG_REMOTE_DEBUG, "%s - recvmsg rc is %d, "
+                          "errno is %d, sock %d\n", __func__, rc, errno, sock);
+            return rc;
+        }
+
+        msg->num_fds = 0;
+        for (chdr = CMSG_FIRSTHDR(&hdr); chdr != NULL;
+             chdr = CMSG_NXTHDR(&hdr, chdr)) {
+            if ((chdr->cmsg_level == SOL_SOCKET) &&
+                (chdr->cmsg_type == SCM_RIGHTS)) {
+                fdsize = chdr->cmsg_len - CMSG_LEN(0);
+                msg->num_fds = fdsize / sizeof(int);
+                if (msg->num_fds > REMOTE_MAX_FDS) {
+                    qemu_log_mask(LOG_REMOTE_DEBUG,
+                                  "%s: Max FDs exceeded\n", __func__);
+                    return -ERANGE;
+                }
+
+                memcpy(msg->fds, CMSG_DATA(chdr), fdsize);
+                break;
+            }
+        }
+
+        if (msg->bytestream) {
+            if (!msg->size) {
+                qemu_mutex_unlock(lock);
+                return -EINVAL;
+            }
+
+            msg->data2 = calloc(1, msg->size);
+            data = msg->data2;
+        } else {
+            data = (uint8_t *)&msg->data1;
+        }
+
+        if (msg->size) {
+            do {
+                rc = read(sock, data, msg->size);
+            } while (rc < 0 && (errno == EINTR || errno == EAGAIN));
+        }
+    }
+    return rc;
+}
+
+static gboolean mpqemu_link_handler_prepare(GSource *gsrc, gint *timeout)
+{
+    g_assert(timeout);
+
+    *timeout = -1;
+
+    return FALSE;
+}
+
+static gboolean mpqemu_link_handler_check(GSource *gsrc)
+{
+    MPQemuChannel *chan = (MPQemuChannel *)gsrc;
+
+    return chan->gpfd.events & chan->gpfd.revents;
+}
+
+static gboolean mpqemu_link_handler_dispatch(GSource *gsrc, GSourceFunc func,
+                                             gpointer data)
+{
+    MPQemuLinkState *s = (MPQemuLinkState *)data;
+    MPQemuChannel *chan = (MPQemuChannel *)gsrc;
+
+    s->callback(chan->gpfd.revents, s, chan);
+
+    if ((chan->gpfd.revents & G_IO_HUP) || (chan->gpfd.revents & G_IO_ERR)) {
+        return G_SOURCE_REMOVE;
+    }
+
+    return G_SOURCE_CONTINUE;
+}
+
+void mpqemu_link_set_callback(MPQemuLinkState *s, mpqemu_link_callback callback)
+{
+    s->callback = callback;
+}
+
+void mpqemu_init_channel(MPQemuLinkState *s, MPQemuChannel **chan, int fd)
+{
+    MPQemuChannel *src;
+
+    gsrc_funcs = (GSourceFuncs){
+        .prepare = mpqemu_link_handler_prepare,
+        .check = mpqemu_link_handler_check,
+        .dispatch = mpqemu_link_handler_dispatch,
+        .finalize = NULL,
+    };
+
+    src = (MPQemuChannel *)g_source_new(&gsrc_funcs, sizeof(MPQemuChannel));
+
+    src->sock = fd;
+    qemu_mutex_init(&src->send_lock);
+    qemu_mutex_init(&src->recv_lock);
+
+    g_source_set_callback(&src->gsrc, NULL, (gpointer)s, NULL);
+    src->gpfd.fd = fd;
+    src->gpfd.events = G_IO_IN | G_IO_HUP | G_IO_ERR;
+    g_source_add_poll(&src->gsrc, &src->gpfd);
+
+    *chan = src;
+}
+
+void mpqemu_destroy_channel(MPQemuChannel *chan)
+{
+    g_source_unref(&chan->gsrc);
+    close(chan->sock);
+    qemu_mutex_destroy(&chan->send_lock);
+    qemu_mutex_destroy(&chan->recv_lock);
+}
+
+void mpqemu_start_coms(MPQemuLinkState *s, MPQemuChannel* chan)
+{
+    g_assert(g_source_attach(&chan->gsrc, s->ctx));
+
+    g_main_loop_run(s->loop);
+}
+
+bool mpqemu_msg_valid(MPQemuMsg *msg)
+{
+    if (msg->cmd >= MAX) {
+        return false;
+    }
+
+    if (msg->bytestream) {
+        if (!msg->data2) {
+            return false;
+        }
+    } else {
+        if (msg->data2) {
+            return false;
+        }
+    }
+
+    /* Verify FDs. */
+    if (msg->num_fds >= REMOTE_MAX_FDS) {
+        return false;
+    }
+    if (msg->num_fds > 0) {
+        for (int i = 0; i < msg->num_fds; i++) {
+            if (fcntl(msg->fds[i], F_GETFL) == -1) {
+                return false;
+            }
+        }
+    }
+
+    return true;
+}