@@ -136,6 +136,14 @@ struct QIOChannelClass {
IOHandler *io_read,
IOHandler *io_write,
void *opaque);
+ ssize_t (*io_async_writev)(QIOChannel *ioc,
+ const struct iovec *iov,
+ size_t niov,
+ int *fds,
+ size_t nfds,
+ Error **errp);
+ void (*io_async_flush)(QIOChannel *ioc,
+ Error **errp);
};
/* General I/O handling functions */
@@ -255,12 +263,17 @@ ssize_t qio_channel_readv_full(QIOChannel *ioc,
* or QIO_CHANNEL_ERR_BLOCK if no data is can be sent
* and the channel is non-blocking
*/
-ssize_t qio_channel_writev_full(QIOChannel *ioc,
- const struct iovec *iov,
- size_t niov,
- int *fds,
- size_t nfds,
- Error **errp);
+ssize_t __qio_channel_writev_full(QIOChannel *ioc,
+ const struct iovec *iov,
+ size_t niov,
+ int *fds,
+ size_t nfds,
+ bool async,
+ Error **errp);
+#define qio_channel_writev_full(ioc, iov, niov, fds, nfds, errp) \
+ __qio_channel_writev_full(ioc, iov, niov, fds, nfds, false, errp)
+#define qio_channel_async_writev_full(ioc, iov, niov, fds, nfds, errp) \
+ __qio_channel_writev_full(ioc, iov, niov, fds, nfds, true, errp)
/**
* qio_channel_readv_all_eof:
@@ -339,10 +352,15 @@ int qio_channel_readv_all(QIOChannel *ioc,
*
* Returns: 0 if all bytes were written, or -1 on error
*/
-int qio_channel_writev_all(QIOChannel *ioc,
- const struct iovec *iov,
- size_t niov,
- Error **erp);
+int __qio_channel_writev_all(QIOChannel *ioc,
+ const struct iovec *iov,
+ size_t niov,
+ bool async,
+ Error **erp);
+#define qio_channel_writev_all(ioc, iov, niov, erp) \
+ __qio_channel_writev_all(ioc, iov, niov, false, erp)
+#define qio_channel_async_writev_all(ioc, iov, niov, erp) \
+ __qio_channel_writev_all(ioc, iov, niov, true, erp)
/**
* qio_channel_readv:
@@ -849,10 +867,55 @@ int qio_channel_readv_full_all(QIOChannel *ioc,
* Returns: 0 if all bytes were written, or -1 on error
*/
-int qio_channel_writev_full_all(QIOChannel *ioc,
- const struct iovec *iov,
- size_t niov,
- int *fds, size_t nfds,
- Error **errp);
+int __qio_channel_writev_full_all(QIOChannel *ioc,
+ const struct iovec *iov,
+ size_t niov,
+ int *fds, size_t nfds,
+ bool async, Error **errp);
+#define qio_channel_writev_full_all(ioc, iov, niov, fds, nfds, errp) \
+ __qio_channel_writev_full_all(ioc, iov, niov, fds, nfds, false, errp)
+#define qio_channel_async_writev_full_all(ioc, iov, niov, fds, nfds, errp) \
+ __qio_channel_writev_full_all(ioc, iov, niov, fds, nfds, true, errp)
+
+/**
+ * qio_channel_async_writev:
+ * @ioc: the channel object
+ * @iov: the array of memory regions to write data from
+ * @niov: the length of the @iov array
+ * @fds: an array of file handles to send
+ * @nfds: number of file handles in @fds
+ * @errp: pointer to a NULL-initialized error object
+ *
+ * Behaves like qio_channel_writev_full, but will send
+ * data asynchronously, this meaning this function
+ * may return before the data is actually sent.
+ *
+ * If at some point it's necessary wait for all data to be
+ * sent, use qio_channel_async_flush().
+ *
+ * If not implemented, falls back to the default writev
+ */
+
+ssize_t qio_channel_async_writev(QIOChannel *ioc,
+ const struct iovec *iov,
+ size_t niov,
+ int *fds,
+ size_t nfds,
+ Error **errp);
+
+/**
+ * qio_channel_async_flush:
+ * @ioc: the channel object
+ * @errp: pointer to a NULL-initialized error object
+ *
+ * Will lock until every packet queued with qio_channel_async_writev()
+ * is sent.
+ *
+ * If not implemented, returns without changing anything.
+ */
+
+void qio_channel_async_flush(QIOChannel *ioc,
+ Error **errp);
+
#endif /* QIO_CHANNEL_H */
@@ -67,12 +67,13 @@ ssize_t qio_channel_readv_full(QIOChannel *ioc,
}
-ssize_t qio_channel_writev_full(QIOChannel *ioc,
- const struct iovec *iov,
- size_t niov,
- int *fds,
- size_t nfds,
- Error **errp)
+ssize_t __qio_channel_writev_full(QIOChannel *ioc,
+ const struct iovec *iov,
+ size_t niov,
+ int *fds,
+ size_t nfds,
+ bool async,
+ Error **errp)
{
QIOChannelClass *klass = QIO_CHANNEL_GET_CLASS(ioc);
@@ -83,6 +84,10 @@ ssize_t qio_channel_writev_full(QIOChannel *ioc,
return -1;
}
+ if (async) {
+ return klass->io_async_writev(ioc, iov, niov, fds, nfds, errp);
+ }
+
return klass->io_writev(ioc, iov, niov, fds, nfds, errp);
}
@@ -212,19 +217,20 @@ int qio_channel_readv_full_all(QIOChannel *ioc,
return ret;
}
-int qio_channel_writev_all(QIOChannel *ioc,
- const struct iovec *iov,
- size_t niov,
- Error **errp)
+int __qio_channel_writev_all(QIOChannel *ioc,
+ const struct iovec *iov,
+ size_t niov,
+ bool async,
+ Error **errp)
{
- return qio_channel_writev_full_all(ioc, iov, niov, NULL, 0, errp);
+ return __qio_channel_writev_full_all(ioc, iov, niov, NULL, 0, async, errp);
}
-int qio_channel_writev_full_all(QIOChannel *ioc,
+int __qio_channel_writev_full_all(QIOChannel *ioc,
const struct iovec *iov,
size_t niov,
int *fds, size_t nfds,
- Error **errp)
+ bool async, Error **errp)
{
int ret = -1;
struct iovec *local_iov = g_new(struct iovec, niov);
@@ -237,8 +243,8 @@ int qio_channel_writev_full_all(QIOChannel *ioc,
while (nlocal_iov > 0) {
ssize_t len;
- len = qio_channel_writev_full(ioc, local_iov, nlocal_iov, fds, nfds,
- errp);
+ len = __qio_channel_writev_full(ioc, local_iov, nlocal_iov, fds, nfds,
+ async, errp);
if (len == QIO_CHANNEL_ERR_BLOCK) {
if (qemu_in_coroutine()) {
qio_channel_yield(ioc, G_IO_OUT);
@@ -474,6 +480,36 @@ off_t qio_channel_io_seek(QIOChannel *ioc,
}
+ssize_t qio_channel_async_writev(QIOChannel *ioc,
+ const struct iovec *iov,
+ size_t niov,
+ int *fds,
+ size_t nfds,
+ Error **errp)
+{
+ QIOChannelClass *klass = QIO_CHANNEL_GET_CLASS(ioc);
+
+ if (!klass->io_async_writev) {
+ return klass->io_writev(ioc, iov, niov, fds, nfds, errp);
+ }
+
+ return klass->io_async_writev(ioc, iov, niov, fds, nfds, errp);
+}
+
+
+void qio_channel_async_flush(QIOChannel *ioc,
+ Error **errp)
+{
+ QIOChannelClass *klass = QIO_CHANNEL_GET_CLASS(ioc);
+
+ if (!klass->io_async_flush) {
+ return;
+ }
+
+ klass->io_async_flush(ioc, errp);
+}
+
+
static void qio_channel_restart_read(void *opaque)
{
QIOChannel *ioc = opaque;
Adds io_async_writev and io_async_flush as optional callback to QIOChannelClass, allowing the implementation of asynchronous writes by subclasses. How to use them: - Write data using qio_channel_async_writev(), - Wait write completion with qio_channel_async_flush(). Notes: Some asynchronous implementations may benefit from zerocopy mechanisms, so it's recommended to keep the write buffer untouched until the return of qio_channel_async_flush(). As the new callbacks are optional, if a subclass does not implement them there will be a fallback to the mandatory synchronous implementation: - io_async_writev will fallback to io_writev, - io_async_flush will return without changing anything. This makes simpler for the user to make use of the asynchronous implementation. Also, some functions like qio_channel_writev_full_all() were adapted to offer an async version, and make better use of the new callbacks. Signed-off-by: Leonardo Bras <leobras@redhat.com> --- include/io/channel.h | 93 +++++++++++++++++++++++++++++++++++++------- io/channel.c | 66 ++++++++++++++++++++++++------- 2 files changed, 129 insertions(+), 30 deletions(-)