@@ -23,6 +23,7 @@
#include <asm/signal.h>
#include <linux/path.h>
#include <net/ipv6.h>
+#include <linux/completion.h>
struct rpc_inode;
@@ -31,6 +32,7 @@ struct rpc_inode;
*/
struct rpc_clnt {
atomic_t cl_count; /* Number of references */
+ atomic_t cl_active_tasks;/* Number of active tasks */
struct list_head cl_clients; /* Global list of clients */
struct list_head cl_tasks; /* List of tasks */
spinlock_t cl_lock; /* spinlock */
@@ -46,6 +48,10 @@ struct rpc_clnt {
struct rpc_stat * cl_stats; /* per-program statistics */
struct rpc_iostats * cl_metrics; /* per-client statistics */
+ unsigned long cl_flags; /* Bit flags */
+ struct rpc_wait_queue cl_waitqueue;
+ struct completion cl_completion;
+
unsigned int cl_softrtry : 1,/* soft timeouts */
cl_discrtry : 1,/* disconnect before retry */
cl_autobind : 1,/* use getport() */
@@ -65,6 +71,8 @@ struct rpc_clnt {
char *cl_principal; /* target to authenticate to */
};
+#define RPC_CLIENT_LOCKED 0
+
/*
* General RPC program info
*/
@@ -135,6 +143,9 @@ void rpc_shutdown_client(struct rpc_clnt *);
void rpc_release_client(struct rpc_clnt *);
void rpc_task_release_client(struct rpc_task *);
+int rpc_lock_client(struct rpc_clnt *clnt, unsigned long timeout);
+void rpc_unlock_client(struct rpc_clnt *clnt);
+
int rpcb_register(u32, u32, int, unsigned short);
int rpcb_v4_register(const u32 program, const u32 version,
const struct sockaddr *address,
@@ -226,6 +226,8 @@ static struct rpc_clnt * rpc_new_client(const struct rpc_create_args *args, stru
atomic_set(&clnt->cl_count, 1);
+ rpc_init_wait_queue(&clnt->cl_waitqueue, "client waitqueue");
+
err = rpc_setup_pipedir(clnt, program->pipe_dir_name);
if (err < 0)
goto out_no_path;
@@ -395,6 +397,8 @@ rpc_clone_client(struct rpc_clnt *clnt)
goto out_no_principal;
}
atomic_set(&new->cl_count, 1);
+ atomic_set(&new->cl_active_tasks, 0);
+ rpc_init_wait_queue(&new->cl_waitqueue, "client waitqueue");
err = rpc_setup_pipedir(new, clnt->cl_program->pipe_dir_name);
if (err != 0)
goto out_no_path;
@@ -571,11 +575,76 @@ out:
}
EXPORT_SYMBOL_GPL(rpc_bind_new_program);
+/**
+ * rpc_lock_client - lock the RPC client
+ * @clnt: pointer to a struct rpc_clnt
+ * @timeout: timeout parameter to pass to wait_for_completion_timeout()
+ *
+ * This function sets the RPC_CLIENT_LOCKED flag, which causes
+ * all new rpc_tasks to wait instead of executing. It then waits for
+ * any existing active tasks to complete.
+ */
+int rpc_lock_client(struct rpc_clnt *clnt, unsigned long timeout)
+{
+ if (!test_and_set_bit(RPC_CLIENT_LOCKED, &clnt->cl_flags))
+ init_completion(&clnt->cl_completion);
+
+ if (atomic_read(&clnt->cl_active_tasks) &&
+ !wait_for_completion_timeout(&clnt->cl_completion, timeout))
+ return -ETIMEDOUT;
+
+ return 0;
+}
+EXPORT_SYMBOL_GPL(rpc_lock_client);
+
+/**
+ * rpc_unlock_client
+ * @clnt: pointer to a struct rpc_clnt
+ *
+ * Clears the RPC_CLIENT_LOCKED flag, and starts any rpc_tasks that
+ * were waiting on it.
+ */
+void rpc_unlock_client(struct rpc_clnt *clnt)
+{
+ spin_lock(&clnt->cl_lock);
+ clear_bit(RPC_CLIENT_LOCKED, &clnt->cl_flags);
+ spin_unlock(&clnt->cl_lock);
+ rpc_wake_up(&clnt->cl_waitqueue);
+}
+EXPORT_SYMBOL_GPL(rpc_unlock_client);
+
+static void rpc_task_clear_active(struct rpc_task *task)
+{
+ struct rpc_clnt *clnt = task->tk_client;
+
+ if (atomic_dec_and_test(&clnt->cl_active_tasks) &&
+ test_bit(RPC_CLIENT_LOCKED, &clnt->cl_flags))
+ complete(&clnt->cl_completion);
+}
+
+static void rpc_task_set_active(struct rpc_task *task)
+{
+ struct rpc_clnt *clnt = task->tk_client;
+
+ atomic_inc(&clnt->cl_active_tasks);
+ if (unlikely(test_bit(RPC_CLIENT_LOCKED, &clnt->cl_flags))) {
+ spin_lock(&clnt->cl_lock);
+ if (test_bit(RPC_CLIENT_LOCKED, &clnt->cl_flags) &&
+ !RPC_ASSASSINATED(task)) {
+ rpc_sleep_on(&clnt->cl_waitqueue, task,
+ rpc_task_set_active);
+ rpc_task_clear_active(task);
+ }
+ spin_unlock(&clnt->cl_lock);
+ }
+}
+
void rpc_task_release_client(struct rpc_task *task)
{
struct rpc_clnt *clnt = task->tk_client;
if (clnt != NULL) {
+ rpc_task_clear_active(task);
/* Remove from client task list */
spin_lock(&clnt->cl_lock);
list_del(&task->tk_task);
@@ -599,6 +668,9 @@ void rpc_task_set_client(struct rpc_task *task, struct rpc_clnt *clnt)
spin_lock(&clnt->cl_lock);
list_add_tail(&task->tk_task, &clnt->cl_tasks);
spin_unlock(&clnt->cl_lock);
+
+ /* Notify the client when this task is activated */
+ task->tk_callback = rpc_task_set_active;
}
}