@@ -47,6 +47,8 @@ struct QIOChannelSocket {
socklen_t localAddrLen;
struct sockaddr_storage remoteAddr;
socklen_t remoteAddrLen;
+ ssize_t async_queued;
+ ssize_t async_sent;
};
@@ -26,9 +26,23 @@
#include "io/channel-watch.h"
#include "trace.h"
#include "qapi/clone-visitor.h"
+#ifdef CONFIG_LINUX
+#include <linux/errqueue.h>
+#include <poll.h>
+#endif
#define SOCKET_MAX_FDS 16
+static ssize_t qio_channel_socket_async_writev(QIOChannel *ioc,
+ const struct iovec *iov,
+ size_t niov,
+ int *fds,
+ size_t nfds,
+ Error **errp);
+
+static void qio_channel_socket_async_flush(QIOChannel *ioc,
+ Error **errp);
+
SocketAddress *
qio_channel_socket_get_local_address(QIOChannelSocket *ioc,
Error **errp)
@@ -55,6 +69,8 @@ qio_channel_socket_new(void)
sioc = QIO_CHANNEL_SOCKET(object_new(TYPE_QIO_CHANNEL_SOCKET));
sioc->fd = -1;
+ sioc->async_queued = 0;
+ sioc->async_sent = 0;
ioc = QIO_CHANNEL(sioc);
qio_channel_set_feature(ioc, QIO_CHANNEL_FEATURE_SHUTDOWN);
@@ -140,6 +156,7 @@ int qio_channel_socket_connect_sync(QIOChannelSocket *ioc,
Error **errp)
{
int fd;
+ int ret, v = 1;
trace_qio_channel_socket_connect_sync(ioc, addr);
fd = socket_connect(addr, errp);
@@ -154,6 +171,19 @@ int qio_channel_socket_connect_sync(QIOChannelSocket *ioc,
return -1;
}
+#ifdef CONFIG_LINUX
+ if (addr->type != SOCKET_ADDRESS_TYPE_INET) {
+ return 0;
+ }
+
+ ret = qemu_setsockopt(fd, SOL_SOCKET, SO_ZEROCOPY, &v, sizeof(v));
+ if (ret >= 0) {
+ QIOChannelClass *klass = QIO_CHANNEL_GET_CLASS(ioc);
+ klass->io_async_writev = qio_channel_socket_async_writev;
+ klass->io_async_flush = qio_channel_socket_async_flush;
+ }
+#endif
+
return 0;
}
@@ -520,12 +550,13 @@ static ssize_t qio_channel_socket_readv(QIOChannel *ioc,
return ret;
}
-static ssize_t qio_channel_socket_writev(QIOChannel *ioc,
- const struct iovec *iov,
- size_t niov,
- int *fds,
- size_t nfds,
- Error **errp)
+static ssize_t __qio_channel_socket_writev(QIOChannel *ioc,
+ const struct iovec *iov,
+ size_t niov,
+ int *fds,
+ size_t nfds,
+ int flags,
+ Error **errp)
{
QIOChannelSocket *sioc = QIO_CHANNEL_SOCKET(ioc);
ssize_t ret;
@@ -558,7 +589,7 @@ static ssize_t qio_channel_socket_writev(QIOChannel *ioc,
}
retry:
- ret = sendmsg(sioc->fd, &msg, 0);
+ ret = sendmsg(sioc->fd, &msg, flags);
if (ret <= 0) {
if (errno == EAGAIN) {
return QIO_CHANNEL_ERR_BLOCK;
@@ -572,6 +603,106 @@ static ssize_t qio_channel_socket_writev(QIOChannel *ioc,
}
return ret;
}
+
+static ssize_t qio_channel_socket_writev(QIOChannel *ioc,
+ const struct iovec *iov,
+ size_t niov,
+ int *fds,
+ size_t nfds,
+ Error **errp)
+{
+ return __qio_channel_socket_writev(ioc, iov, niov, fds, nfds, 0, errp);
+}
+
+static ssize_t qio_channel_socket_async_writev(QIOChannel *ioc,
+ const struct iovec *iov,
+ size_t niov,
+ int *fds,
+ size_t nfds,
+ Error **errp)
+{
+ QIOChannelSocket *sioc = QIO_CHANNEL_SOCKET(ioc);
+
+ sioc->async_queued++;
+
+ return __qio_channel_socket_writev(ioc, iov, niov, fds, nfds, MSG_ZEROCOPY,
+ errp);
+}
+
+
+static void qio_channel_socket_async_flush(QIOChannel *ioc,
+ Error **errp)
+{
+ QIOChannelSocket *sioc = QIO_CHANNEL_SOCKET(ioc);
+ struct msghdr msg = {};
+ struct pollfd pfd;
+ struct sock_extended_err *serr;
+ struct cmsghdr *cm;
+ char control[CMSG_SPACE(sizeof(int) * SOCKET_MAX_FDS)];
+ int ret;
+
+ memset(control, 0, CMSG_SPACE(sizeof(int) * SOCKET_MAX_FDS));
+ msg.msg_control = control;
+ msg.msg_controllen = sizeof(control);
+
+ while (sioc->async_sent < sioc->async_queued) {
+ ret = recvmsg(sioc->fd, &msg, MSG_ERRQUEUE);
+ if (ret < 0) {
+ if (errno == EAGAIN) {
+ /* Nothing on errqueue, wait */
+ pfd.fd = sioc->fd;
+ pfd.events = 0;
+ ret = poll(&pfd, 1, 250);
+ if (ret == 0) {
+ /*
+ * Timeout : After 250ms without receiving any zerocopy
+ * notification, consider all data as sent.
+ */
+ break;
+ } else if (ret < 0 ||
+ (pfd.revents & (POLLERR | POLLHUP | POLLNVAL))) {
+ error_setg_errno(errp, errno,
+ "Poll error");
+ break;
+ } else {
+ continue;
+ }
+ }
+ if (errno == EINTR) {
+ continue;
+ }
+
+ error_setg_errno(errp, errno,
+ "Unable to read errqueue");
+ break;
+ }
+
+ cm = CMSG_FIRSTHDR(&msg);
+ if (cm->cmsg_level != SOL_IP &&
+ cm->cmsg_type != IP_RECVERR) {
+ error_setg_errno(errp, EPROTOTYPE,
+ "Wrong cmsg in errqueue");
+ break;
+ }
+
+ serr = (void *) CMSG_DATA(cm);
+ if (serr->ee_errno != SO_EE_ORIGIN_NONE) {
+ error_setg_errno(errp, serr->ee_errno,
+ "Error on socket");
+ break;
+ }
+ if (serr->ee_origin != SO_EE_ORIGIN_ZEROCOPY) {
+ error_setg_errno(errp, serr->ee_origin,
+ "Error not from zerocopy");
+ break;
+ }
+
+ /* No errors, count sent ids*/
+ sioc->async_sent += serr->ee_data - serr->ee_info + 1;
+ }
+}
+
+
#else /* WIN32 */
static ssize_t qio_channel_socket_readv(QIOChannel *ioc,
const struct iovec *iov,
Implement the new optional callbacks io_async_write and io_async_flush on QIOChannelSocket, but enables it only when MSG_ZEROCOPY feature is available in the host kernel, and TCP sockets are used. qio_channel_socket_writev() contents were moved to a helper function __qio_channel_socket_writev() which accepts an extra 'flag' argument. This helper function is used to implement qio_channel_socket_writev(), with flags = 0, keeping it's behavior unchanged, and qio_channel_socket_async_writev() with flags = MSG_ZEROCOPY. qio_channel_socket_async_flush() was implemented by reading the socket's error queue, which will have information on MSG_ZEROCOPY send completion. There is no need to worry with re-sending packets in case any error happens, as MSG_ZEROCOPY only works with TCP and it will re-tranmsmit if any error ocurs. Notes on using async_write(): - As MSG_ZEROCOPY tells the kernel to use the same user buffer to avoid copying, some caution is necessary to avoid overwriting any buffer before it's sent. If something like this happen, a newer version of the buffer may be sent instead. - If this is a problem, it's recommended to use async_flush() before freeing or re-using the buffer. . Signed-off-by: Leonardo Bras <leobras@redhat.com> --- include/io/channel-socket.h | 2 + io/channel-socket.c | 145 ++++++++++++++++++++++++++++++++++-- 2 files changed, 140 insertions(+), 7 deletions(-)