@@ -50,6 +50,7 @@ typedef struct FuseExport {
struct fuse_session *fuse_session;
struct fuse_buf fuse_buf;
+ unsigned int in_flight; /* atomic */
bool mounted, fd_handler_set_up;
char *mountpoint;
@@ -78,6 +79,42 @@ static void read_from_fuse_export(void *opaque);
static bool is_regular_file(const char *path, Error **errp);
+static void fuse_export_drained_begin(void *opaque)
+{
+ FuseExport *exp = opaque;
+
+ aio_set_fd_handler(exp->common.ctx,
+ fuse_session_fd(exp->fuse_session), false,
+ NULL, NULL, NULL, NULL, NULL);
+ exp->fd_handler_set_up = false;
+}
+
+static void fuse_export_drained_end(void *opaque)
+{
+ FuseExport *exp = opaque;
+
+ /* Refresh AioContext in case it changed */
+ exp->common.ctx = blk_get_aio_context(exp->common.blk);
+
+ aio_set_fd_handler(exp->common.ctx,
+ fuse_session_fd(exp->fuse_session), false,
+ read_from_fuse_export, NULL, NULL, NULL, exp);
+ exp->fd_handler_set_up = true;
+}
+
+static bool fuse_export_drained_poll(void *opaque)
+{
+ FuseExport *exp = opaque;
+
+ return qatomic_read(&exp->in_flight) > 0;
+}
+
+static const BlockDevOps fuse_export_blk_dev_ops = {
+ .drained_begin = fuse_export_drained_begin,
+ .drained_end = fuse_export_drained_end,
+ .drained_poll = fuse_export_drained_poll,
+};
+
static int fuse_export_create(BlockExport *blk_exp,
BlockExportOptions *blk_exp_args,
Error **errp)
@@ -101,6 +138,15 @@ static int fuse_export_create(BlockExport *blk_exp,
}
}
+ blk_set_dev_ops(exp->common.blk, &fuse_export_blk_dev_ops, exp);
+
+ /*
+ * We handle draining ourselves using an in-flight counter and by disabling
+ * the FUSE fd handler. Do not queue BlockBackend requests, they need to
+ * complete so the in-flight counter reaches zero.
+ */
+ blk_set_disable_request_queuing(exp->common.blk, true);
+
init_exports_table();
/*
@@ -224,7 +270,7 @@ static int setup_fuse_export(FuseExport *exp, const char *mountpoint,
g_hash_table_insert(exports, g_strdup(mountpoint), NULL);
aio_set_fd_handler(exp->common.ctx,
- fuse_session_fd(exp->fuse_session), true,
+ fuse_session_fd(exp->fuse_session), false,
read_from_fuse_export, NULL, NULL, NULL, exp);
exp->fd_handler_set_up = true;
@@ -248,6 +294,8 @@ static void read_from_fuse_export(void *opaque)
blk_exp_ref(&exp->common);
aio_context_release(exp->common.ctx);
+ qatomic_inc(&exp->in_flight);
+
do {
ret = fuse_session_receive_buf(exp->fuse_session, &exp->fuse_buf);
} while (ret == -EINTR);
@@ -258,6 +306,10 @@ static void read_from_fuse_export(void *opaque)
fuse_session_process_buf(exp->fuse_session, &exp->fuse_buf);
out:
+ if (qatomic_fetch_dec(&exp->in_flight) == 1) {
+ aio_wait_kick(); /* wake AIO_WAIT_WHILE() */
+ }
+
aio_context_acquire(exp->common.ctx);
blk_exp_unref(&exp->common);
aio_context_release(exp->common.ctx);
@@ -272,7 +324,7 @@ static void fuse_export_shutdown(BlockExport *blk_exp)
if (exp->fd_handler_set_up) {
aio_set_fd_handler(exp->common.ctx,
- fuse_session_fd(exp->fuse_session), true,
+ fuse_session_fd(exp->fuse_session), false,
NULL, NULL, NULL, NULL, NULL);
exp->fd_handler_set_up = false;
}
@@ -291,6 +343,8 @@ static void fuse_export_delete(BlockExport *blk_exp)
{
FuseExport *exp = container_of(blk_exp, FuseExport, common);
+ blk_set_dev_ops(exp->common.blk, NULL, NULL);
+
if (exp->fuse_session) {
if (exp->mounted) {
fuse_session_unmount(exp->fuse_session);
This is part of ongoing work to remove the aio_disable_external() API. Use BlockDevOps .drained_begin/end/poll() instead of aio_set_fd_handler(is_external=true). As a side-effect the FUSE export now follows AioContext changes like the other export types. Signed-off-by: Stefan Hajnoczi <stefanha@redhat.com> --- block/export/fuse.c | 58 +++++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 56 insertions(+), 2 deletions(-)