diff mbox series

[for-6.13,07/19] nfs/localio: add direct IO enablement with sync and async IO support

Message ID 20241108234002.16392-8-snitzer@kernel.org (mailing list archive)
State New
Headers show
Series nfs/nfsd: fixes and improvements for LOCALIO | expand

Commit Message

Mike Snitzer Nov. 8, 2024, 11:39 p.m. UTC
This commit simply adds the required O_DIRECT plumbing.  It doesn't
address the fact that NFS doesn't ensure all writes are page aligned
(nor device logical block size aligned as required by O_DIRECT).

Because NFS will read-modify-write for IO that isn't aligned, LOCALIO
will not use O_DIRECT semantics by default if/when an application
requests the use of O_DIRECT.  Allow the use of O_DIRECT semantics by:
1: Adding a flag to the nfs_pgio_header struct to allow the NFS
   O_DIRECT layer to signal that O_DIRECT was used by the application
2: Adding a 'localio_O_DIRECT_semantics' NFS module parameter that
   when enabled will cause LOCALIO to use O_DIRECT semantics (this may
   cause IO to fail if applications do not properly align their IO).

Adding Direct IO support helps side-step the problem that LOCALIO
currently double buffers buffered IO (by using page cache in both NFS
and the underlying filesystem).  More care is needed to craft a proper
solution for LOCALIO's redundant use of page cache for buffered IO,
e.g.: https://marc.info/?l=linux-nfs&m=171996211625151&w=2

This commit is derived from code developed by Weston Andros Adamson.

Signed-off-by: Mike Snitzer <snitzer@kernel.org>
---
 fs/nfs/direct.c         |  1 +
 fs/nfs/localio.c        | 92 ++++++++++++++++++++++++++++++++++++-----
 include/linux/nfs_xdr.h |  1 +
 3 files changed, 84 insertions(+), 10 deletions(-)

Comments

NeilBrown Nov. 11, 2024, 1:31 a.m. UTC | #1
On Sat, 09 Nov 2024, Mike Snitzer wrote:
> This commit simply adds the required O_DIRECT plumbing.  It doesn't
> address the fact that NFS doesn't ensure all writes are page aligned
> (nor device logical block size aligned as required by O_DIRECT).
> 
> Because NFS will read-modify-write for IO that isn't aligned, LOCALIO
> will not use O_DIRECT semantics by default if/when an application
> requests the use of O_DIRECT.  Allow the use of O_DIRECT semantics by:
> 1: Adding a flag to the nfs_pgio_header struct to allow the NFS
>    O_DIRECT layer to signal that O_DIRECT was used by the application
> 2: Adding a 'localio_O_DIRECT_semantics' NFS module parameter that
>    when enabled will cause LOCALIO to use O_DIRECT semantics (this may
>    cause IO to fail if applications do not properly align their IO).
> 
> Adding Direct IO support helps side-step the problem that LOCALIO
> currently double buffers buffered IO (by using page cache in both NFS
> and the underlying filesystem).  More care is needed to craft a proper
> solution for LOCALIO's redundant use of page cache for buffered IO,
> e.g.: https://marc.info/?l=linux-nfs&m=171996211625151&w=2
> 
> This commit is derived from code developed by Weston Andros Adamson.
> 
> Signed-off-by: Mike Snitzer <snitzer@kernel.org>
> ---
>  fs/nfs/direct.c         |  1 +
>  fs/nfs/localio.c        | 92 ++++++++++++++++++++++++++++++++++++-----
>  include/linux/nfs_xdr.h |  1 +
>  3 files changed, 84 insertions(+), 10 deletions(-)
> 
> diff --git a/fs/nfs/direct.c b/fs/nfs/direct.c
> index 90079ca134dd..4b92493d6ff0 100644
> --- a/fs/nfs/direct.c
> +++ b/fs/nfs/direct.c
> @@ -303,6 +303,7 @@ static void nfs_read_sync_pgio_error(struct list_head *head, int error)
>  static void nfs_direct_pgio_init(struct nfs_pgio_header *hdr)
>  {
>  	get_dreq(hdr->dreq);
> +	set_bit(NFS_IOHDR_ODIRECT, &hdr->flags);
>  }
>  
>  static const struct nfs_pgio_completion_ops nfs_direct_read_completion_ops = {
> diff --git a/fs/nfs/localio.c b/fs/nfs/localio.c
> index 4b8618cf114c..de0dcd76d84d 100644
> --- a/fs/nfs/localio.c
> +++ b/fs/nfs/localio.c
> @@ -35,6 +35,7 @@ struct nfs_local_kiocb {
>  	struct bio_vec		*bvec;
>  	struct nfs_pgio_header	*hdr;
>  	struct work_struct	work;
> +	void (*aio_complete_work)(struct work_struct *);
>  	struct nfsd_file	*localio;
>  };
>  
> @@ -48,6 +49,10 @@ struct nfs_local_fsync_ctx {
>  static bool localio_enabled __read_mostly = true;
>  module_param(localio_enabled, bool, 0644);
>  
> +static bool localio_O_DIRECT_semantics __read_mostly = false;
> +module_param(localio_O_DIRECT_semantics, bool, 0644);
> +MODULE_PARM_DESC(localio_O_DIRECT_semantics, "Use O_DIRECT semantics");

Should the text mention localio??


> +
>  static inline bool nfs_client_is_local(const struct nfs_client *clp)
>  {
>  	return !!test_bit(NFS_CS_LOCAL_IO, &clp->cl_flags);
> @@ -285,10 +290,19 @@ nfs_local_iocb_alloc(struct nfs_pgio_header *hdr,
>  		kfree(iocb);
>  		return NULL;
>  	}
> -	init_sync_kiocb(&iocb->kiocb, file);
> +
> +	if (localio_O_DIRECT_semantics &&
> +	    test_bit(NFS_IOHDR_ODIRECT, &hdr->flags)) {
> +		iocb->kiocb.ki_filp = file;
> +		iocb->kiocb.ki_flags = IOCB_DIRECT;

why isn't ki_ioprio initialised??

The rest I am not able to review as I'm that that familiar with iocb
code.

Thanks,
NeilBrown


> +	} else
> +		init_sync_kiocb(&iocb->kiocb, file);
> +
>  	iocb->kiocb.ki_pos = hdr->args.offset;
>  	iocb->hdr = hdr;
>  	iocb->kiocb.ki_flags &= ~IOCB_APPEND;
> +	iocb->aio_complete_work = NULL;
> +
>  	return iocb;
>  }
>  
> @@ -343,6 +357,18 @@ nfs_local_pgio_release(struct nfs_local_kiocb *iocb)
>  	nfs_local_hdr_release(hdr, hdr->task.tk_ops);
>  }
>  
> +/*
> + * Complete the I/O from iocb->kiocb.ki_complete()
> + *
> + * Note that this function can be called from a bottom half context,
> + * hence we need to queue the rpc_call_done() etc to a workqueue
> + */
> +static inline void nfs_local_pgio_aio_complete(struct nfs_local_kiocb *iocb)
> +{
> +	INIT_WORK(&iocb->work, iocb->aio_complete_work);
> +	queue_work(nfsiod_workqueue, &iocb->work);
> +}
> +
>  static void
>  nfs_local_read_done(struct nfs_local_kiocb *iocb, long status)
>  {
> @@ -365,6 +391,23 @@ nfs_local_read_done(struct nfs_local_kiocb *iocb, long status)
>  			status > 0 ? status : 0, hdr->res.eof);
>  }
>  
> +static void nfs_local_read_aio_complete_work(struct work_struct *work)
> +{
> +	struct nfs_local_kiocb *iocb =
> +		container_of(work, struct nfs_local_kiocb, work);
> +
> +	nfs_local_pgio_release(iocb);
> +}
> +
> +static void nfs_local_read_aio_complete(struct kiocb *kiocb, long ret)
> +{
> +	struct nfs_local_kiocb *iocb =
> +		container_of(kiocb, struct nfs_local_kiocb, kiocb);
> +
> +	nfs_local_read_done(iocb, ret);
> +	nfs_local_pgio_aio_complete(iocb); /* Calls nfs_local_read_aio_complete_work */
> +}
> +
>  static void nfs_local_call_read(struct work_struct *work)
>  {
>  	struct nfs_local_kiocb *iocb =
> @@ -379,10 +422,10 @@ static void nfs_local_call_read(struct work_struct *work)
>  	nfs_local_iter_init(&iter, iocb, READ);
>  
>  	status = filp->f_op->read_iter(&iocb->kiocb, &iter);
> -	WARN_ON_ONCE(status == -EIOCBQUEUED);
> -
> -	nfs_local_read_done(iocb, status);
> -	nfs_local_pgio_release(iocb);
> +	if (status != -EIOCBQUEUED) {
> +		nfs_local_read_done(iocb, status);
> +		nfs_local_pgio_release(iocb);
> +	}
>  
>  	revert_creds(save_cred);
>  }
> @@ -410,6 +453,11 @@ nfs_do_local_read(struct nfs_pgio_header *hdr,
>  	nfs_local_pgio_init(hdr, call_ops);
>  	hdr->res.eof = false;
>  
> +	if (iocb->kiocb.ki_flags & IOCB_DIRECT) {
> +		iocb->kiocb.ki_complete = nfs_local_read_aio_complete;
> +		iocb->aio_complete_work = nfs_local_read_aio_complete_work;
> +	}
> +
>  	INIT_WORK(&iocb->work, nfs_local_call_read);
>  	queue_work(nfslocaliod_workqueue, &iocb->work);
>  
> @@ -534,6 +582,24 @@ nfs_local_write_done(struct nfs_local_kiocb *iocb, long status)
>  	nfs_local_pgio_done(hdr, status);
>  }
>  
> +static void nfs_local_write_aio_complete_work(struct work_struct *work)
> +{
> +	struct nfs_local_kiocb *iocb =
> +		container_of(work, struct nfs_local_kiocb, work);
> +
> +	nfs_local_vfs_getattr(iocb);
> +	nfs_local_pgio_release(iocb);
> +}
> +
> +static void nfs_local_write_aio_complete(struct kiocb *kiocb, long ret)
> +{
> +	struct nfs_local_kiocb *iocb =
> +		container_of(kiocb, struct nfs_local_kiocb, kiocb);
> +
> +	nfs_local_write_done(iocb, ret);
> +	nfs_local_pgio_aio_complete(iocb); /* Calls nfs_local_write_aio_complete_work */
> +}
> +
>  static void nfs_local_call_write(struct work_struct *work)
>  {
>  	struct nfs_local_kiocb *iocb =
> @@ -552,11 +618,11 @@ static void nfs_local_call_write(struct work_struct *work)
>  	file_start_write(filp);
>  	status = filp->f_op->write_iter(&iocb->kiocb, &iter);
>  	file_end_write(filp);
> -	WARN_ON_ONCE(status == -EIOCBQUEUED);
> -
> -	nfs_local_write_done(iocb, status);
> -	nfs_local_vfs_getattr(iocb);
> -	nfs_local_pgio_release(iocb);
> +	if (status != -EIOCBQUEUED) {
> +		nfs_local_write_done(iocb, status);
> +		nfs_local_vfs_getattr(iocb);
> +		nfs_local_pgio_release(iocb);
> +	}
>  
>  	revert_creds(save_cred);
>  	current->flags = old_flags;
> @@ -592,10 +658,16 @@ nfs_do_local_write(struct nfs_pgio_header *hdr,
>  	case NFS_FILE_SYNC:
>  		iocb->kiocb.ki_flags |= IOCB_DSYNC|IOCB_SYNC;
>  	}
> +
>  	nfs_local_pgio_init(hdr, call_ops);
>  
>  	nfs_set_local_verifier(hdr->inode, hdr->res.verf, hdr->args.stable);
>  
> +	if (iocb->kiocb.ki_flags & IOCB_DIRECT) {
> +		iocb->kiocb.ki_complete = nfs_local_write_aio_complete;
> +		iocb->aio_complete_work = nfs_local_write_aio_complete_work;
> +	}
> +
>  	INIT_WORK(&iocb->work, nfs_local_call_write);
>  	queue_work(nfslocaliod_workqueue, &iocb->work);
>  
> diff --git a/include/linux/nfs_xdr.h b/include/linux/nfs_xdr.h
> index e0ae0a14257f..f30e94d105b7 100644
> --- a/include/linux/nfs_xdr.h
> +++ b/include/linux/nfs_xdr.h
> @@ -1632,6 +1632,7 @@ enum {
>  	NFS_IOHDR_RESEND_PNFS,
>  	NFS_IOHDR_RESEND_MDS,
>  	NFS_IOHDR_UNSTABLE_WRITES,
> +	NFS_IOHDR_ODIRECT,
>  };
>  
>  struct nfs_io_completion;
> -- 
> 2.44.0
> 
>
Chuck Lever Nov. 12, 2024, 2:31 p.m. UTC | #2
On Fri, Nov 08, 2024 at 06:39:50PM -0500, Mike Snitzer wrote:
> This commit simply adds the required O_DIRECT plumbing.  It doesn't
> address the fact that NFS doesn't ensure all writes are page aligned
> (nor device logical block size aligned as required by O_DIRECT).
> 
> Because NFS will read-modify-write for IO that isn't aligned, LOCALIO
> will not use O_DIRECT semantics by default if/when an application
> requests the use of O_DIRECT.  Allow the use of O_DIRECT semantics by:
> 1: Adding a flag to the nfs_pgio_header struct to allow the NFS
>    O_DIRECT layer to signal that O_DIRECT was used by the application
> 2: Adding a 'localio_O_DIRECT_semantics' NFS module parameter that
>    when enabled will cause LOCALIO to use O_DIRECT semantics (this may
>    cause IO to fail if applications do not properly align their IO).

I'm not clear why the module parameter is necessary. Applications
that use O_DIRECT, I think, assume there /are/ alignment
restrictions. Generally they are constructed to operate agnosticly
on both NFS and non-NFS file systems, so I'm not sure any such
applications expect or depend on NFS's looser I/O alignment.

If it turns out to be necessary, the module parameter should be
documented somewhere (maybe under Documentation/).


> Adding Direct IO support helps side-step the problem that LOCALIO
> currently double buffers buffered IO (by using page cache in both NFS
> and the underlying filesystem).  More care is needed to craft a proper
> solution for LOCALIO's redundant use of page cache for buffered IO,
> e.g.: https://marc.info/?l=linux-nfs&m=171996211625151&w=2

This last paragraph confused me initially. Above, the description
states that this change is to address support for applications using
O_DIRECT. But here, you mention a problem that appears to affect all
users of LOCALIO. I guess this paragraph is aspirational? I'm not
sure I would use direct I/O to address generic double-buffering --
not only does direct I/O have alignment constraints but it also
makes some assumptions about how the application is managing its I/O
buffer.

It would help me if this paragraph was dropped, since (IIUC) it
isn't directly related to the use of O_DIRECT by applications; and
perhaps add an initial paragraph that provides a problem statement.


> This commit is derived from code developed by Weston Andros Adamson.
> 
> Signed-off-by: Mike Snitzer <snitzer@kernel.org>
> ---
>  fs/nfs/direct.c         |  1 +
>  fs/nfs/localio.c        | 92 ++++++++++++++++++++++++++++++++++++-----
>  include/linux/nfs_xdr.h |  1 +
>  3 files changed, 84 insertions(+), 10 deletions(-)
> 
> diff --git a/fs/nfs/direct.c b/fs/nfs/direct.c
> index 90079ca134dd..4b92493d6ff0 100644
> --- a/fs/nfs/direct.c
> +++ b/fs/nfs/direct.c
> @@ -303,6 +303,7 @@ static void nfs_read_sync_pgio_error(struct list_head *head, int error)
>  static void nfs_direct_pgio_init(struct nfs_pgio_header *hdr)
>  {
>  	get_dreq(hdr->dreq);
> +	set_bit(NFS_IOHDR_ODIRECT, &hdr->flags);
>  }
>  
>  static const struct nfs_pgio_completion_ops nfs_direct_read_completion_ops = {
> diff --git a/fs/nfs/localio.c b/fs/nfs/localio.c
> index 4b8618cf114c..de0dcd76d84d 100644
> --- a/fs/nfs/localio.c
> +++ b/fs/nfs/localio.c
> @@ -35,6 +35,7 @@ struct nfs_local_kiocb {
>  	struct bio_vec		*bvec;
>  	struct nfs_pgio_header	*hdr;
>  	struct work_struct	work;
> +	void (*aio_complete_work)(struct work_struct *);
>  	struct nfsd_file	*localio;
>  };
>  
> @@ -48,6 +49,10 @@ struct nfs_local_fsync_ctx {
>  static bool localio_enabled __read_mostly = true;
>  module_param(localio_enabled, bool, 0644);
>  
> +static bool localio_O_DIRECT_semantics __read_mostly = false;
> +module_param(localio_O_DIRECT_semantics, bool, 0644);
> +MODULE_PARM_DESC(localio_O_DIRECT_semantics, "Use O_DIRECT semantics");
> +
>  static inline bool nfs_client_is_local(const struct nfs_client *clp)
>  {
>  	return !!test_bit(NFS_CS_LOCAL_IO, &clp->cl_flags);
> @@ -285,10 +290,19 @@ nfs_local_iocb_alloc(struct nfs_pgio_header *hdr,
>  		kfree(iocb);
>  		return NULL;
>  	}
> -	init_sync_kiocb(&iocb->kiocb, file);
> +
> +	if (localio_O_DIRECT_semantics &&
> +	    test_bit(NFS_IOHDR_ODIRECT, &hdr->flags)) {
> +		iocb->kiocb.ki_filp = file;
> +		iocb->kiocb.ki_flags = IOCB_DIRECT;
> +	} else
> +		init_sync_kiocb(&iocb->kiocb, file);
> +
>  	iocb->kiocb.ki_pos = hdr->args.offset;
>  	iocb->hdr = hdr;
>  	iocb->kiocb.ki_flags &= ~IOCB_APPEND;
> +	iocb->aio_complete_work = NULL;
> +
>  	return iocb;
>  }
>  
> @@ -343,6 +357,18 @@ nfs_local_pgio_release(struct nfs_local_kiocb *iocb)
>  	nfs_local_hdr_release(hdr, hdr->task.tk_ops);
>  }
>  
> +/*
> + * Complete the I/O from iocb->kiocb.ki_complete()
> + *
> + * Note that this function can be called from a bottom half context,
> + * hence we need to queue the rpc_call_done() etc to a workqueue
> + */
> +static inline void nfs_local_pgio_aio_complete(struct nfs_local_kiocb *iocb)
> +{
> +	INIT_WORK(&iocb->work, iocb->aio_complete_work);
> +	queue_work(nfsiod_workqueue, &iocb->work);
> +}
> +
>  static void
>  nfs_local_read_done(struct nfs_local_kiocb *iocb, long status)
>  {
> @@ -365,6 +391,23 @@ nfs_local_read_done(struct nfs_local_kiocb *iocb, long status)
>  			status > 0 ? status : 0, hdr->res.eof);
>  }
>  
> +static void nfs_local_read_aio_complete_work(struct work_struct *work)
> +{
> +	struct nfs_local_kiocb *iocb =
> +		container_of(work, struct nfs_local_kiocb, work);
> +
> +	nfs_local_pgio_release(iocb);
> +}
> +
> +static void nfs_local_read_aio_complete(struct kiocb *kiocb, long ret)
> +{
> +	struct nfs_local_kiocb *iocb =
> +		container_of(kiocb, struct nfs_local_kiocb, kiocb);
> +
> +	nfs_local_read_done(iocb, ret);
> +	nfs_local_pgio_aio_complete(iocb); /* Calls nfs_local_read_aio_complete_work */
> +}
> +
>  static void nfs_local_call_read(struct work_struct *work)
>  {
>  	struct nfs_local_kiocb *iocb =
> @@ -379,10 +422,10 @@ static void nfs_local_call_read(struct work_struct *work)
>  	nfs_local_iter_init(&iter, iocb, READ);
>  
>  	status = filp->f_op->read_iter(&iocb->kiocb, &iter);
> -	WARN_ON_ONCE(status == -EIOCBQUEUED);
> -
> -	nfs_local_read_done(iocb, status);
> -	nfs_local_pgio_release(iocb);
> +	if (status != -EIOCBQUEUED) {
> +		nfs_local_read_done(iocb, status);
> +		nfs_local_pgio_release(iocb);
> +	}
>  
>  	revert_creds(save_cred);
>  }
> @@ -410,6 +453,11 @@ nfs_do_local_read(struct nfs_pgio_header *hdr,
>  	nfs_local_pgio_init(hdr, call_ops);
>  	hdr->res.eof = false;
>  
> +	if (iocb->kiocb.ki_flags & IOCB_DIRECT) {
> +		iocb->kiocb.ki_complete = nfs_local_read_aio_complete;
> +		iocb->aio_complete_work = nfs_local_read_aio_complete_work;
> +	}
> +
>  	INIT_WORK(&iocb->work, nfs_local_call_read);
>  	queue_work(nfslocaliod_workqueue, &iocb->work);
>  
> @@ -534,6 +582,24 @@ nfs_local_write_done(struct nfs_local_kiocb *iocb, long status)
>  	nfs_local_pgio_done(hdr, status);
>  }
>  
> +static void nfs_local_write_aio_complete_work(struct work_struct *work)
> +{
> +	struct nfs_local_kiocb *iocb =
> +		container_of(work, struct nfs_local_kiocb, work);
> +
> +	nfs_local_vfs_getattr(iocb);
> +	nfs_local_pgio_release(iocb);
> +}
> +
> +static void nfs_local_write_aio_complete(struct kiocb *kiocb, long ret)
> +{
> +	struct nfs_local_kiocb *iocb =
> +		container_of(kiocb, struct nfs_local_kiocb, kiocb);
> +
> +	nfs_local_write_done(iocb, ret);
> +	nfs_local_pgio_aio_complete(iocb); /* Calls nfs_local_write_aio_complete_work */
> +}
> +
>  static void nfs_local_call_write(struct work_struct *work)
>  {
>  	struct nfs_local_kiocb *iocb =
> @@ -552,11 +618,11 @@ static void nfs_local_call_write(struct work_struct *work)
>  	file_start_write(filp);
>  	status = filp->f_op->write_iter(&iocb->kiocb, &iter);
>  	file_end_write(filp);
> -	WARN_ON_ONCE(status == -EIOCBQUEUED);
> -
> -	nfs_local_write_done(iocb, status);
> -	nfs_local_vfs_getattr(iocb);
> -	nfs_local_pgio_release(iocb);
> +	if (status != -EIOCBQUEUED) {
> +		nfs_local_write_done(iocb, status);
> +		nfs_local_vfs_getattr(iocb);
> +		nfs_local_pgio_release(iocb);
> +	}
>  
>  	revert_creds(save_cred);
>  	current->flags = old_flags;
> @@ -592,10 +658,16 @@ nfs_do_local_write(struct nfs_pgio_header *hdr,
>  	case NFS_FILE_SYNC:
>  		iocb->kiocb.ki_flags |= IOCB_DSYNC|IOCB_SYNC;
>  	}
> +
>  	nfs_local_pgio_init(hdr, call_ops);
>  
>  	nfs_set_local_verifier(hdr->inode, hdr->res.verf, hdr->args.stable);
>  
> +	if (iocb->kiocb.ki_flags & IOCB_DIRECT) {
> +		iocb->kiocb.ki_complete = nfs_local_write_aio_complete;
> +		iocb->aio_complete_work = nfs_local_write_aio_complete_work;
> +	}
> +
>  	INIT_WORK(&iocb->work, nfs_local_call_write);
>  	queue_work(nfslocaliod_workqueue, &iocb->work);
>  
> diff --git a/include/linux/nfs_xdr.h b/include/linux/nfs_xdr.h
> index e0ae0a14257f..f30e94d105b7 100644
> --- a/include/linux/nfs_xdr.h
> +++ b/include/linux/nfs_xdr.h
> @@ -1632,6 +1632,7 @@ enum {
>  	NFS_IOHDR_RESEND_PNFS,
>  	NFS_IOHDR_RESEND_MDS,
>  	NFS_IOHDR_UNSTABLE_WRITES,
> +	NFS_IOHDR_ODIRECT,
>  };
>  
>  struct nfs_io_completion;
> -- 
> 2.44.0
>
diff mbox series

Patch

diff --git a/fs/nfs/direct.c b/fs/nfs/direct.c
index 90079ca134dd..4b92493d6ff0 100644
--- a/fs/nfs/direct.c
+++ b/fs/nfs/direct.c
@@ -303,6 +303,7 @@  static void nfs_read_sync_pgio_error(struct list_head *head, int error)
 static void nfs_direct_pgio_init(struct nfs_pgio_header *hdr)
 {
 	get_dreq(hdr->dreq);
+	set_bit(NFS_IOHDR_ODIRECT, &hdr->flags);
 }
 
 static const struct nfs_pgio_completion_ops nfs_direct_read_completion_ops = {
diff --git a/fs/nfs/localio.c b/fs/nfs/localio.c
index 4b8618cf114c..de0dcd76d84d 100644
--- a/fs/nfs/localio.c
+++ b/fs/nfs/localio.c
@@ -35,6 +35,7 @@  struct nfs_local_kiocb {
 	struct bio_vec		*bvec;
 	struct nfs_pgio_header	*hdr;
 	struct work_struct	work;
+	void (*aio_complete_work)(struct work_struct *);
 	struct nfsd_file	*localio;
 };
 
@@ -48,6 +49,10 @@  struct nfs_local_fsync_ctx {
 static bool localio_enabled __read_mostly = true;
 module_param(localio_enabled, bool, 0644);
 
+static bool localio_O_DIRECT_semantics __read_mostly = false;
+module_param(localio_O_DIRECT_semantics, bool, 0644);
+MODULE_PARM_DESC(localio_O_DIRECT_semantics, "Use O_DIRECT semantics");
+
 static inline bool nfs_client_is_local(const struct nfs_client *clp)
 {
 	return !!test_bit(NFS_CS_LOCAL_IO, &clp->cl_flags);
@@ -285,10 +290,19 @@  nfs_local_iocb_alloc(struct nfs_pgio_header *hdr,
 		kfree(iocb);
 		return NULL;
 	}
-	init_sync_kiocb(&iocb->kiocb, file);
+
+	if (localio_O_DIRECT_semantics &&
+	    test_bit(NFS_IOHDR_ODIRECT, &hdr->flags)) {
+		iocb->kiocb.ki_filp = file;
+		iocb->kiocb.ki_flags = IOCB_DIRECT;
+	} else
+		init_sync_kiocb(&iocb->kiocb, file);
+
 	iocb->kiocb.ki_pos = hdr->args.offset;
 	iocb->hdr = hdr;
 	iocb->kiocb.ki_flags &= ~IOCB_APPEND;
+	iocb->aio_complete_work = NULL;
+
 	return iocb;
 }
 
@@ -343,6 +357,18 @@  nfs_local_pgio_release(struct nfs_local_kiocb *iocb)
 	nfs_local_hdr_release(hdr, hdr->task.tk_ops);
 }
 
+/*
+ * Complete the I/O from iocb->kiocb.ki_complete()
+ *
+ * Note that this function can be called from a bottom half context,
+ * hence we need to queue the rpc_call_done() etc to a workqueue
+ */
+static inline void nfs_local_pgio_aio_complete(struct nfs_local_kiocb *iocb)
+{
+	INIT_WORK(&iocb->work, iocb->aio_complete_work);
+	queue_work(nfsiod_workqueue, &iocb->work);
+}
+
 static void
 nfs_local_read_done(struct nfs_local_kiocb *iocb, long status)
 {
@@ -365,6 +391,23 @@  nfs_local_read_done(struct nfs_local_kiocb *iocb, long status)
 			status > 0 ? status : 0, hdr->res.eof);
 }
 
+static void nfs_local_read_aio_complete_work(struct work_struct *work)
+{
+	struct nfs_local_kiocb *iocb =
+		container_of(work, struct nfs_local_kiocb, work);
+
+	nfs_local_pgio_release(iocb);
+}
+
+static void nfs_local_read_aio_complete(struct kiocb *kiocb, long ret)
+{
+	struct nfs_local_kiocb *iocb =
+		container_of(kiocb, struct nfs_local_kiocb, kiocb);
+
+	nfs_local_read_done(iocb, ret);
+	nfs_local_pgio_aio_complete(iocb); /* Calls nfs_local_read_aio_complete_work */
+}
+
 static void nfs_local_call_read(struct work_struct *work)
 {
 	struct nfs_local_kiocb *iocb =
@@ -379,10 +422,10 @@  static void nfs_local_call_read(struct work_struct *work)
 	nfs_local_iter_init(&iter, iocb, READ);
 
 	status = filp->f_op->read_iter(&iocb->kiocb, &iter);
-	WARN_ON_ONCE(status == -EIOCBQUEUED);
-
-	nfs_local_read_done(iocb, status);
-	nfs_local_pgio_release(iocb);
+	if (status != -EIOCBQUEUED) {
+		nfs_local_read_done(iocb, status);
+		nfs_local_pgio_release(iocb);
+	}
 
 	revert_creds(save_cred);
 }
@@ -410,6 +453,11 @@  nfs_do_local_read(struct nfs_pgio_header *hdr,
 	nfs_local_pgio_init(hdr, call_ops);
 	hdr->res.eof = false;
 
+	if (iocb->kiocb.ki_flags & IOCB_DIRECT) {
+		iocb->kiocb.ki_complete = nfs_local_read_aio_complete;
+		iocb->aio_complete_work = nfs_local_read_aio_complete_work;
+	}
+
 	INIT_WORK(&iocb->work, nfs_local_call_read);
 	queue_work(nfslocaliod_workqueue, &iocb->work);
 
@@ -534,6 +582,24 @@  nfs_local_write_done(struct nfs_local_kiocb *iocb, long status)
 	nfs_local_pgio_done(hdr, status);
 }
 
+static void nfs_local_write_aio_complete_work(struct work_struct *work)
+{
+	struct nfs_local_kiocb *iocb =
+		container_of(work, struct nfs_local_kiocb, work);
+
+	nfs_local_vfs_getattr(iocb);
+	nfs_local_pgio_release(iocb);
+}
+
+static void nfs_local_write_aio_complete(struct kiocb *kiocb, long ret)
+{
+	struct nfs_local_kiocb *iocb =
+		container_of(kiocb, struct nfs_local_kiocb, kiocb);
+
+	nfs_local_write_done(iocb, ret);
+	nfs_local_pgio_aio_complete(iocb); /* Calls nfs_local_write_aio_complete_work */
+}
+
 static void nfs_local_call_write(struct work_struct *work)
 {
 	struct nfs_local_kiocb *iocb =
@@ -552,11 +618,11 @@  static void nfs_local_call_write(struct work_struct *work)
 	file_start_write(filp);
 	status = filp->f_op->write_iter(&iocb->kiocb, &iter);
 	file_end_write(filp);
-	WARN_ON_ONCE(status == -EIOCBQUEUED);
-
-	nfs_local_write_done(iocb, status);
-	nfs_local_vfs_getattr(iocb);
-	nfs_local_pgio_release(iocb);
+	if (status != -EIOCBQUEUED) {
+		nfs_local_write_done(iocb, status);
+		nfs_local_vfs_getattr(iocb);
+		nfs_local_pgio_release(iocb);
+	}
 
 	revert_creds(save_cred);
 	current->flags = old_flags;
@@ -592,10 +658,16 @@  nfs_do_local_write(struct nfs_pgio_header *hdr,
 	case NFS_FILE_SYNC:
 		iocb->kiocb.ki_flags |= IOCB_DSYNC|IOCB_SYNC;
 	}
+
 	nfs_local_pgio_init(hdr, call_ops);
 
 	nfs_set_local_verifier(hdr->inode, hdr->res.verf, hdr->args.stable);
 
+	if (iocb->kiocb.ki_flags & IOCB_DIRECT) {
+		iocb->kiocb.ki_complete = nfs_local_write_aio_complete;
+		iocb->aio_complete_work = nfs_local_write_aio_complete_work;
+	}
+
 	INIT_WORK(&iocb->work, nfs_local_call_write);
 	queue_work(nfslocaliod_workqueue, &iocb->work);
 
diff --git a/include/linux/nfs_xdr.h b/include/linux/nfs_xdr.h
index e0ae0a14257f..f30e94d105b7 100644
--- a/include/linux/nfs_xdr.h
+++ b/include/linux/nfs_xdr.h
@@ -1632,6 +1632,7 @@  enum {
 	NFS_IOHDR_RESEND_PNFS,
 	NFS_IOHDR_RESEND_MDS,
 	NFS_IOHDR_UNSTABLE_WRITES,
+	NFS_IOHDR_ODIRECT,
 };
 
 struct nfs_io_completion;