@@ -2394,35 +2394,54 @@ static void nfs_destroy_inodecache(void)
kmem_cache_destroy(nfs_inode_cachep);
}
+struct workqueue_struct *nfslocaliod_workqueue;
struct workqueue_struct *nfsiod_workqueue;
EXPORT_SYMBOL_GPL(nfsiod_workqueue);
/*
- * start up the nfsiod workqueue
- */
-static int nfsiod_start(void)
-{
- struct workqueue_struct *wq;
- dprintk("RPC: creating workqueue nfsiod\n");
- wq = alloc_workqueue("nfsiod", WQ_MEM_RECLAIM | WQ_UNBOUND, 0);
- if (wq == NULL)
- return -ENOMEM;
- nfsiod_workqueue = wq;
- return 0;
-}
-
-/*
- * Destroy the nfsiod workqueue
+ * Destroy the nfsiod workqueues
*/
static void nfsiod_stop(void)
{
struct workqueue_struct *wq;
wq = nfsiod_workqueue;
- if (wq == NULL)
- return;
- nfsiod_workqueue = NULL;
- destroy_workqueue(wq);
+ if (wq != NULL) {
+ nfsiod_workqueue = NULL;
+ destroy_workqueue(wq);
+ }
+#if IS_ENABLED(CONFIG_NFS_LOCALIO)
+ wq = nfslocaliod_workqueue;
+ if (wq != NULL) {
+ nfslocaliod_workqueue = NULL;
+ destroy_workqueue(wq);
+ }
+#endif /* CONFIG_NFS_LOCALIO */
+}
+
+/*
+ * Start the nfsiod workqueues
+ */
+static int nfsiod_start(void)
+{
+ dprintk("RPC: creating workqueue nfsiod\n");
+ nfsiod_workqueue = alloc_workqueue("nfsiod", WQ_MEM_RECLAIM | WQ_UNBOUND, 0);
+ if (nfsiod_workqueue == NULL)
+ return -ENOMEM;
+#if IS_ENABLED(CONFIG_NFS_LOCALIO)
+ /*
+ * localio writes need to use a normal (non-memreclaim) workqueue.
+ * When we start getting low on space, XFS goes and calls flush_work() on
+ * a non-memreclaim work queue, which causes a priority inversion problem.
+ */
+ dprintk("RPC: creating workqueue nfslocaliod\n");
+ nfslocaliod_workqueue = alloc_workqueue("nfslocaliod", WQ_UNBOUND, 0);
+ if (unlikely(nfslocaliod_workqueue == NULL)) {
+ nfsiod_stop();
+ return -ENOMEM;
+ }
+#endif /* CONFIG_NFS_LOCALIO */
+ return 0;
}
unsigned int nfs_net_id;
@@ -440,6 +440,7 @@ int nfs_check_flags(int);
/* inode.c */
extern struct workqueue_struct *nfsiod_workqueue;
+extern struct workqueue_struct *nfslocaliod_workqueue;
extern struct inode *nfs_alloc_inode(struct super_block *sb);
extern void nfs_free_inode(struct inode *);
extern int nfs_write_inode(struct inode *, struct writeback_control *);
@@ -44,6 +44,13 @@ struct nfs_local_fsync_ctx {
};
static void nfs_local_fsync_work(struct work_struct *work);
+struct nfs_local_io_args {
+ struct nfs_local_kiocb *iocb;
+ const struct cred *cred;
+ struct work_struct work;
+ struct completion *done;
+};
+
/*
* We need to translate between nfs status return values and
* the local errno values which may not be the same.
@@ -301,30 +308,55 @@ nfs_local_read_done(struct nfs_local_kiocb *iocb, long status)
status > 0 ? status : 0, hdr->res.eof);
}
-static int
-nfs_do_local_read(struct nfs_pgio_header *hdr, struct file *filp,
- const struct rpc_call_ops *call_ops)
+static void nfs_local_call_read(struct work_struct *work)
{
- struct nfs_local_kiocb *iocb;
+ struct nfs_local_io_args *args =
+ container_of(work, struct nfs_local_io_args, work);
+ struct nfs_local_kiocb *iocb = args->iocb;
+ struct file *filp = iocb->kiocb.ki_filp;
+ const struct cred *save_cred;
struct iov_iter iter;
ssize_t status;
+ current->flags |= PF_LOCAL_THROTTLE | PF_MEMALLOC_NOIO;
+ save_cred = override_creds(args->cred);
+
+ nfs_local_iter_init(&iter, iocb, READ);
+
+ status = filp->f_op->read_iter(&iocb->kiocb, &iter);
+ WARN_ON_ONCE(status == -EIOCBQUEUED);
+
+ nfs_local_read_done(iocb, status);
+ nfs_local_pgio_release(iocb);
+
+ revert_creds(save_cred);
+ complete(args->done);
+}
+
+static int nfs_do_local_read(struct nfs_pgio_header *hdr, struct file *filp,
+ const struct rpc_call_ops *call_ops)
+{
+ struct nfs_local_io_args args;
+ DECLARE_COMPLETION_ONSTACK(done);
+ struct nfs_local_kiocb *iocb;
+
dprintk("%s: vfs_read count=%u pos=%llu\n",
__func__, hdr->args.count, hdr->args.offset);
iocb = nfs_local_iocb_alloc(hdr, filp, GFP_KERNEL);
if (iocb == NULL)
return -ENOMEM;
- nfs_local_iter_init(&iter, iocb, READ);
nfs_local_pgio_init(hdr, call_ops);
hdr->res.eof = false;
- status = filp->f_op->read_iter(&iocb->kiocb, &iter);
- WARN_ON_ONCE(status == -EIOCBQUEUED);
-
- nfs_local_read_done(iocb, status);
- nfs_local_pgio_release(iocb);
+ args.iocb = iocb;
+ args.done = &done;
+ args.cred = current_cred();
+ INIT_WORK_ONSTACK(&args.work, nfs_local_call_read);
+ queue_work(nfslocaliod_workqueue, &args.work);
+ wait_for_completion(&done);
+ destroy_work_on_stack(&args.work);
return 0;
}
@@ -456,14 +488,41 @@ nfs_local_write_done(struct nfs_local_kiocb *iocb, long status)
nfs_local_pgio_done(hdr, status);
}
-static int
-nfs_do_local_write(struct nfs_pgio_header *hdr, struct file *filp,
- const struct rpc_call_ops *call_ops)
+static void nfs_local_call_write(struct work_struct *work)
{
- struct nfs_local_kiocb *iocb;
+ struct nfs_local_io_args *args =
+ container_of(work, struct nfs_local_io_args, work);
+ struct nfs_local_kiocb *iocb = args->iocb;
+ struct file *filp = iocb->kiocb.ki_filp;
+ const struct cred *save_cred;
struct iov_iter iter;
ssize_t status;
+ current->flags |= PF_LOCAL_THROTTLE | PF_MEMALLOC_NOIO;
+ save_cred = override_creds(args->cred);
+
+ nfs_local_iter_init(&iter, iocb, WRITE);
+
+ file_start_write(filp);
+ status = filp->f_op->write_iter(&iocb->kiocb, &iter);
+ file_end_write(filp);
+ WARN_ON_ONCE(status == -EIOCBQUEUED);
+
+ nfs_local_write_done(iocb, status);
+ nfs_local_vfs_getattr(iocb);
+ nfs_local_pgio_release(iocb);
+
+ revert_creds(save_cred);
+ complete(args->done);
+}
+
+static int nfs_do_local_write(struct nfs_pgio_header *hdr, struct file *filp,
+ const struct rpc_call_ops *call_ops)
+{
+ struct nfs_local_io_args args;
+ DECLARE_COMPLETION_ONSTACK(done);
+ struct nfs_local_kiocb *iocb;
+
dprintk("%s: vfs_write count=%u pos=%llu %s\n",
__func__, hdr->args.count, hdr->args.offset,
(hdr->args.stable == NFS_UNSTABLE) ? "unstable" : "stable");
@@ -471,7 +530,6 @@ nfs_do_local_write(struct nfs_pgio_header *hdr, struct file *filp,
iocb = nfs_local_iocb_alloc(hdr, filp, GFP_NOIO);
if (iocb == NULL)
return -ENOMEM;
- nfs_local_iter_init(&iter, iocb, WRITE);
switch (hdr->args.stable) {
default:
@@ -486,14 +544,13 @@ nfs_do_local_write(struct nfs_pgio_header *hdr, struct file *filp,
nfs_set_local_verifier(hdr->inode, hdr->res.verf, hdr->args.stable);
- file_start_write(filp);
- status = filp->f_op->write_iter(&iocb->kiocb, &iter);
- file_end_write(filp);
- WARN_ON_ONCE(status == -EIOCBQUEUED);
-
- nfs_local_write_done(iocb, status);
- nfs_local_vfs_getattr(iocb);
- nfs_local_pgio_release(iocb);
+ args.iocb = iocb;
+ args.done = &done;
+ args.cred = current_cred();
+ INIT_WORK_ONSTACK(&args.work, nfs_local_call_write);
+ queue_work(nfslocaliod_workqueue, &args.work);
+ wait_for_completion(&done);
+ destroy_work_on_stack(&args.work);
return 0;
}