@@ -186,6 +186,8 @@ extern struct nfs_client *nfs4_set_ds_client(struct nfs_client* mds_clp,
int ds_addrlen, int ds_proto,
unsigned int ds_timeo,
unsigned int ds_retrans);
+extern struct rpc_clnt *nfs4_find_or_create_ds_client(struct nfs_client *,
+ struct inode *);
#ifdef CONFIG_PROC_FS
extern int __init nfs_fs_proc_init(void);
extern void nfs_fs_proc_exit(void);
@@ -49,10 +49,83 @@ static void nfs4_shutdown_session(struct nfs_client *clp)
}
}
+
+static struct rpc_clnt *
+nfs4_find_or_add_ds_client(struct nfs_client *ds_clp, rpc_authflavor_t flavor,
+ struct rpc_clnt *new)
+{
+ struct rpc_clnt *ds_clnt, *tmp;
+
+ spin_lock(&ds_clp->cl_lock);
+ list_for_each_entry_safe(ds_clnt, tmp, &ds_clp->cl_ds_clients,
+ cl_ds_clnts) {
+ if (ds_clnt->cl_auth->au_flavor != flavor)
+ continue;
+ goto out;
+ }
+ if (new)
+ list_add(&new->cl_ds_clnts, &ds_clp->cl_ds_clients);
+ ds_clnt = new;
+out:
+ spin_unlock(&ds_clp->cl_lock); /* need some lock to protect list */
+ return ds_clnt;
+}
+
+/**
+ * Find or create a DS rpc client with th MDS server rpc client auth flavor
+ * in the nfs_client cl_ds_clients list.
+ */
+struct rpc_clnt *
+nfs4_find_or_create_ds_client(struct nfs_client *ds_clp, struct inode *inode)
+{
+ struct rpc_clnt *ds_rpc_client, *new;
+ rpc_authflavor_t flavor = NFS_SERVER(inode)->client->cl_auth->au_flavor;
+
+ ds_rpc_client = nfs4_find_or_add_ds_client(ds_clp, flavor, NULL);
+ if (ds_rpc_client != NULL)
+ goto out;
+ new = rpc_clone_client_set_auth(ds_clp->cl_rpcclient, flavor);
+ if (IS_ERR(new))
+ return new;
+ ds_rpc_client = nfs4_find_or_add_ds_client(ds_clp, flavor, new);
+ if (ds_rpc_client != new)
+ rpc_release_client(new);
+out:
+ return ds_rpc_client;
+}
+EXPORT_SYMBOL_GPL(nfs4_find_or_create_ds_client);
+
+static void
+nfs4_shutdown_ds_clients(struct nfs_client *clp)
+{
+ struct rpc_clnt *clnt;
+ LIST_HEAD(shutdown_list);
+
+ spin_lock(&clp->cl_lock);
+ while (!list_empty(&clp->cl_ds_clients)) {
+ clnt = list_entry(clp->cl_ds_clients.next,
+ struct rpc_clnt, cl_ds_clnts);
+ list_move(&clnt->cl_ds_clnts, &shutdown_list);
+ }
+ spin_unlock(&clp->cl_lock);
+
+ while (!list_empty(&shutdown_list)) {
+ clnt = list_entry(shutdown_list.next,
+ struct rpc_clnt, cl_ds_clnts);
+ list_del(&clnt->cl_ds_clnts);
+ rpc_shutdown_client(clnt);
+ }
+}
+
#else /* CONFIG_NFS_V4_1 */
static void nfs4_shutdown_session(struct nfs_client *clp)
{
}
+
+static void
+nfs4_shutdown_ds_clients(struct nfs_client *clp)
+{
+}
#endif /* CONFIG_NFS_V4_1 */
struct nfs_client *nfs4_alloc_client(const struct nfs_client_initdata *cl_init)
@@ -73,6 +146,7 @@ struct nfs_client *nfs4_alloc_client(const struct nfs_client_initdata *cl_init)
spin_lock_init(&clp->cl_lock);
INIT_DELAYED_WORK(&clp->cl_renewd, nfs4_renew_state);
+ INIT_LIST_HEAD(&clp->cl_ds_clients);
rpc_init_wait_queue(&clp->cl_rpcwaitq, "NFS client");
clp->cl_state = 1 << NFS4CLNT_LEASE_EXPIRED;
clp->cl_minorversion = cl_init->minorversion;
@@ -97,6 +171,7 @@ static void nfs4_shutdown_client(struct nfs_client *clp)
{
if (__test_and_clear_bit(NFS_CS_RENEWD, &clp->cl_res_state))
nfs4_kill_renewd(clp);
+ nfs4_shutdown_ds_clients(clp);
nfs4_shutdown_session(clp);
nfs4_destroy_callback(clp);
if (__test_and_clear_bit(NFS_CS_IDMAP, &clp->cl_res_state))
@@ -528,6 +528,7 @@ filelayout_read_pagelist(struct nfs_read_data *data)
struct nfs_pgio_header *hdr = data->header;
struct pnfs_layout_segment *lseg = hdr->lseg;
struct nfs4_pnfs_ds *ds;
+ struct rpc_clnt *ds_clnt;
loff_t offset = data->args.offset;
u32 j, idx;
struct nfs_fh *fh;
@@ -542,6 +543,11 @@ filelayout_read_pagelist(struct nfs_read_data *data)
ds = nfs4_fl_prepare_ds(lseg, idx);
if (!ds)
return PNFS_NOT_ATTEMPTED;
+
+ ds_clnt = nfs4_find_or_create_ds_client(ds->ds_clp, hdr->inode);
+ if (IS_ERR(ds_clnt))
+ return PNFS_NOT_ATTEMPTED;
+
dprintk("%s USE DS: %s cl_count %d\n", __func__,
ds->ds_remotestr, atomic_read(&ds->ds_clp->cl_count));
@@ -556,7 +562,7 @@ filelayout_read_pagelist(struct nfs_read_data *data)
data->mds_offset = offset;
/* Perform an asynchronous read to ds */
- nfs_initiate_read(ds->ds_clp->cl_rpcclient, data,
+ nfs_initiate_read(ds_clnt, data,
&filelayout_read_call_ops, RPC_TASK_SOFTCONN);
return PNFS_ATTEMPTED;
}
@@ -568,6 +574,7 @@ filelayout_write_pagelist(struct nfs_write_data *data, int sync)
struct nfs_pgio_header *hdr = data->header;
struct pnfs_layout_segment *lseg = hdr->lseg;
struct nfs4_pnfs_ds *ds;
+ struct rpc_clnt *ds_clnt;
loff_t offset = data->args.offset;
u32 j, idx;
struct nfs_fh *fh;
@@ -578,6 +585,11 @@ filelayout_write_pagelist(struct nfs_write_data *data, int sync)
ds = nfs4_fl_prepare_ds(lseg, idx);
if (!ds)
return PNFS_NOT_ATTEMPTED;
+
+ ds_clnt = nfs4_find_or_create_ds_client(ds->ds_clp, hdr->inode);
+ if (IS_ERR(ds_clnt))
+ return PNFS_NOT_ATTEMPTED;
+
dprintk("%s ino %lu sync %d req %Zu@%llu DS: %s cl_count %d\n",
__func__, hdr->inode->i_ino, sync, (size_t) data->args.count,
offset, ds->ds_remotestr, atomic_read(&ds->ds_clp->cl_count));
@@ -595,7 +607,7 @@ filelayout_write_pagelist(struct nfs_write_data *data, int sync)
data->args.offset = filelayout_get_dserver_offset(lseg, offset);
/* Perform an asynchronous write */
- nfs_initiate_write(ds->ds_clp->cl_rpcclient, data,
+ nfs_initiate_write(ds_clnt, data,
&filelayout_write_call_ops, sync,
RPC_TASK_SOFTCONN);
return PNFS_ATTEMPTED;
@@ -1105,16 +1117,19 @@ static int filelayout_initiate_commit(struct nfs_commit_data *data, int how)
{
struct pnfs_layout_segment *lseg = data->lseg;
struct nfs4_pnfs_ds *ds;
+ struct rpc_clnt *ds_clnt;
u32 idx;
struct nfs_fh *fh;
idx = calc_ds_index_from_commit(lseg, data->ds_commit_index);
ds = nfs4_fl_prepare_ds(lseg, idx);
- if (!ds) {
- prepare_to_resend_writes(data);
- filelayout_commit_release(data);
- return -EAGAIN;
- }
+ if (!ds)
+ goto out_err;
+
+ ds_clnt = nfs4_find_or_create_ds_client(ds->ds_clp, data->inode);
+ if (IS_ERR(ds_clnt))
+ goto out_err;
+
dprintk("%s ino %lu, how %d cl_count %d\n", __func__,
data->inode->i_ino, how, atomic_read(&ds->ds_clp->cl_count));
data->commit_done_cb = filelayout_commit_done_cb;
@@ -1123,9 +1138,13 @@ static int filelayout_initiate_commit(struct nfs_commit_data *data, int how)
fh = select_ds_fh_from_commit(lseg, data->ds_commit_index);
if (fh)
data->args.fh = fh;
- return nfs_initiate_commit(ds->ds_clp->cl_rpcclient, data,
+ return nfs_initiate_commit(ds_clnt, data,
&filelayout_commit_call_ops, how,
RPC_TASK_SOFTCONN);
+out_err:
+ prepare_to_resend_writes(data);
+ filelayout_commit_release(data);
+ return -EAGAIN;
}
static int
@@ -56,6 +56,7 @@ struct nfs_client {
struct rpc_cred *cl_machine_cred;
#if IS_ENABLED(CONFIG_NFS_V4)
+ struct list_head cl_ds_clients; /* auth flavor data servers */
u64 cl_clientid; /* constant */
nfs4_verifier cl_confirm; /* Clientid verifier */
unsigned long cl_state;
@@ -35,6 +35,8 @@ struct rpc_clnt {
atomic_t cl_count; /* Number of references */
struct list_head cl_clients; /* Global list of clients */
struct list_head cl_tasks; /* List of tasks */
+ struct list_head cl_ds_clnts; /* list of per auth flavor
+ data servers */
spinlock_t cl_lock; /* spinlock */
struct rpc_xprt __rcu * cl_xprt; /* transport */
struct rpc_procinfo * cl_procinfo; /* procedure info */
@@ -356,6 +356,7 @@ static struct rpc_clnt * rpc_new_client(const struct rpc_create_args *args,
goto out_no_stats;
clnt->cl_program = program;
INIT_LIST_HEAD(&clnt->cl_tasks);
+ INIT_LIST_HEAD(&clnt->cl_ds_clnts);
spin_lock_init(&clnt->cl_lock);
if (!xprt_bound(xprt))