@@ -11,6 +11,7 @@
#include <linux/nfs_mount.h>
#include <linux/nfs_page.h>
#include <linux/module.h>
+#include <linux/file.h>
#include <linux/sched/mm.h>
#include <linux/sunrpc/metrics.h>
@@ -162,6 +163,21 @@ decode_name(struct xdr_stream *xdr, u32 *id)
return 0;
}
+static struct nfsd_file *
+ff_local_open_fh(struct nfs_client *clp, const struct cred *cred,
+ struct nfs_fh *fh, fmode_t mode)
+{
+ if (mode & FMODE_WRITE) {
+ /*
+ * Always request read and write access since this corresponds
+ * to a rw layout.
+ */
+ mode |= FMODE_READ;
+ }
+
+ return nfs_local_open_fh(clp, cred, fh, mode);
+}
+
static bool ff_mirror_match_fh(const struct nfs4_ff_layout_mirror *m1,
const struct nfs4_ff_layout_mirror *m2)
{
@@ -237,7 +253,7 @@ static struct nfs4_ff_layout_mirror *ff_layout_alloc_mirror(gfp_t gfp_flags)
static void ff_layout_free_mirror(struct nfs4_ff_layout_mirror *mirror)
{
- const struct cred *cred;
+ const struct cred *cred;
ff_layout_remove_mirror(mirror);
kfree(mirror->fh_versions);
@@ -1756,6 +1772,7 @@ ff_layout_read_pagelist(struct nfs_pgio_header *hdr)
struct pnfs_layout_segment *lseg = hdr->lseg;
struct nfs4_pnfs_ds *ds;
struct rpc_clnt *ds_clnt;
+ struct nfsd_file *localio;
struct nfs4_ff_layout_mirror *mirror;
const struct cred *ds_cred;
loff_t offset = hdr->args.offset;
@@ -1802,11 +1819,18 @@ ff_layout_read_pagelist(struct nfs_pgio_header *hdr)
hdr->args.offset = offset;
hdr->mds_offset = offset;
+ /* Start IO accounting for local read */
+ localio = ff_local_open_fh(ds->ds_clp, ds_cred, fh, FMODE_READ);
+ if (localio) {
+ hdr->task.tk_start = ktime_get();
+ ff_layout_read_record_layoutstats_start(&hdr->task, hdr);
+ }
+
/* Perform an asynchronous read to ds */
nfs_initiate_pgio(ds_clnt, hdr, ds_cred, ds->ds_clp->rpc_ops,
vers == 3 ? &ff_layout_read_call_ops_v3 :
&ff_layout_read_call_ops_v4,
- 0, RPC_TASK_SOFTCONN, NULL);
+ 0, RPC_TASK_SOFTCONN, localio);
put_cred(ds_cred);
return PNFS_ATTEMPTED;
@@ -1826,6 +1850,7 @@ ff_layout_write_pagelist(struct nfs_pgio_header *hdr, int sync)
struct pnfs_layout_segment *lseg = hdr->lseg;
struct nfs4_pnfs_ds *ds;
struct rpc_clnt *ds_clnt;
+ struct nfsd_file *localio;
struct nfs4_ff_layout_mirror *mirror;
const struct cred *ds_cred;
loff_t offset = hdr->args.offset;
@@ -1870,11 +1895,19 @@ ff_layout_write_pagelist(struct nfs_pgio_header *hdr, int sync)
*/
hdr->args.offset = offset;
+ /* Start IO accounting for local write */
+ localio = ff_local_open_fh(ds->ds_clp, ds_cred, fh,
+ FMODE_READ|FMODE_WRITE);
+ if (localio) {
+ hdr->task.tk_start = ktime_get();
+ ff_layout_write_record_layoutstats_start(&hdr->task, hdr);
+ }
+
/* Perform an asynchronous write */
nfs_initiate_pgio(ds_clnt, hdr, ds_cred, ds->ds_clp->rpc_ops,
vers == 3 ? &ff_layout_write_call_ops_v3 :
&ff_layout_write_call_ops_v4,
- sync, RPC_TASK_SOFTCONN, NULL);
+ sync, RPC_TASK_SOFTCONN, localio);
put_cred(ds_cred);
return PNFS_ATTEMPTED;
@@ -1908,6 +1941,7 @@ static int ff_layout_initiate_commit(struct nfs_commit_data *data, int how)
struct pnfs_layout_segment *lseg = data->lseg;
struct nfs4_pnfs_ds *ds;
struct rpc_clnt *ds_clnt;
+ struct nfsd_file *localio;
struct nfs4_ff_layout_mirror *mirror;
const struct cred *ds_cred;
u32 idx;
@@ -1946,10 +1980,18 @@ static int ff_layout_initiate_commit(struct nfs_commit_data *data, int how)
if (fh)
data->args.fh = fh;
+ /* Start IO accounting for local commit */
+ localio = ff_local_open_fh(ds->ds_clp, ds_cred, fh,
+ FMODE_READ|FMODE_WRITE);
+ if (localio) {
+ data->task.tk_start = ktime_get();
+ ff_layout_commit_record_layoutstats_start(&data->task, data);
+ }
+
ret = nfs_initiate_commit(ds_clnt, data, ds->ds_clp->rpc_ops,
vers == 3 ? &ff_layout_commit_call_ops_v3 :
&ff_layout_commit_call_ops_v4,
- how, RPC_TASK_SOFTCONN, NULL);
+ how, RPC_TASK_SOFTCONN, localio);
put_cred(ds_cred);
return ret;
out_err:
@@ -395,6 +395,12 @@ nfs4_ff_layout_prepare_ds(struct pnfs_layout_segment *lseg,
/* connect success, check rsize/wsize limit */
if (!status) {
+ /*
+ * ds_clp is put in destroy_ds().
+ * keep ds_clp even if DS is local, so that if local IO cannot
+ * proceed somehow, we can fall back to NFS whenever we want.
+ */
+ nfs_local_probe(ds->ds_clp);
max_payload =
nfs_block_size(rpc_max_payload(ds->ds_clp->cl_rpcclient),
NULL);