diff mbox series

[RFC,3/5] SUNRPC: Remove the bh-safe lock requirement on xprt->transport_lock

Message ID 20190503111841.4391-4-trond.myklebust@hammerspace.com (mailing list archive)
State New, archived
Headers show
Series bh-safe lock removal for SUNRPC | expand

Commit Message

Trond Myklebust May 3, 2019, 11:18 a.m. UTC
Signed-off-by: Trond Myklebust <trond.myklebust@hammerspace.com>
---
 net/sunrpc/xprt.c                          | 61 ++++++++++------------
 net/sunrpc/xprtrdma/rpc_rdma.c             |  4 +-
 net/sunrpc/xprtrdma/svc_rdma_backchannel.c |  4 +-
 net/sunrpc/xprtrdma/svc_rdma_transport.c   |  8 +--
 net/sunrpc/xprtsock.c                      | 23 ++++----
 5 files changed, 47 insertions(+), 53 deletions(-)

Comments

Chuck Lever III May 3, 2019, 2:21 p.m. UTC | #1
> On May 3, 2019, at 7:18 AM, Trond Myklebust <trondmy@gmail.com> wrote:
> 
> Signed-off-by: Trond Myklebust <trond.myklebust@hammerspace.com>
> ---
> net/sunrpc/xprt.c                          | 61 ++++++++++------------
> net/sunrpc/xprtrdma/rpc_rdma.c             |  4 +-
> net/sunrpc/xprtrdma/svc_rdma_backchannel.c |  4 +-
> net/sunrpc/xprtrdma/svc_rdma_transport.c   |  8 +--
> net/sunrpc/xprtsock.c                      | 23 ++++----
> 5 files changed, 47 insertions(+), 53 deletions(-)

For rpc_rdma.c and svc_rdma_backchannel.c:

   Reviewed-by: Chuck Lever <chuck.lever@oracle.com>

For svc_rdma_transport.c:

These locks are server-side only. AFAICS it's not safe
to leave BH's enabled here. Can you drop these hunks?


> diff --git a/net/sunrpc/xprt.c b/net/sunrpc/xprt.c
> index bc1c8247750d..b87d185cf010 100644
> --- a/net/sunrpc/xprt.c
> +++ b/net/sunrpc/xprt.c
> @@ -301,9 +301,9 @@ static inline int xprt_lock_write(struct rpc_xprt *xprt, struct rpc_task *task)
> 
> 	if (test_bit(XPRT_LOCKED, &xprt->state) && xprt->snd_task == task)
> 		return 1;
> -	spin_lock_bh(&xprt->transport_lock);
> +	spin_lock(&xprt->transport_lock);
> 	retval = xprt->ops->reserve_xprt(xprt, task);
> -	spin_unlock_bh(&xprt->transport_lock);
> +	spin_unlock(&xprt->transport_lock);
> 	return retval;
> }
> 
> @@ -380,9 +380,9 @@ static inline void xprt_release_write(struct rpc_xprt *xprt, struct rpc_task *ta
> {
> 	if (xprt->snd_task != task)
> 		return;
> -	spin_lock_bh(&xprt->transport_lock);
> +	spin_lock(&xprt->transport_lock);
> 	xprt->ops->release_xprt(xprt, task);
> -	spin_unlock_bh(&xprt->transport_lock);
> +	spin_unlock(&xprt->transport_lock);
> }
> 
> /*
> @@ -434,9 +434,9 @@ xprt_request_get_cong(struct rpc_xprt *xprt, struct rpc_rqst *req)
> 
> 	if (req->rq_cong)
> 		return true;
> -	spin_lock_bh(&xprt->transport_lock);
> +	spin_lock(&xprt->transport_lock);
> 	ret = __xprt_get_cong(xprt, req) != 0;
> -	spin_unlock_bh(&xprt->transport_lock);
> +	spin_unlock(&xprt->transport_lock);
> 	return ret;
> }
> EXPORT_SYMBOL_GPL(xprt_request_get_cong);
> @@ -463,9 +463,9 @@ static void
> xprt_clear_congestion_window_wait(struct rpc_xprt *xprt)
> {
> 	if (test_and_clear_bit(XPRT_CWND_WAIT, &xprt->state)) {
> -		spin_lock_bh(&xprt->transport_lock);
> +		spin_lock(&xprt->transport_lock);
> 		__xprt_lock_write_next_cong(xprt);
> -		spin_unlock_bh(&xprt->transport_lock);
> +		spin_unlock(&xprt->transport_lock);
> 	}
> }
> 
> @@ -562,9 +562,9 @@ bool xprt_write_space(struct rpc_xprt *xprt)
> 
> 	if (!test_bit(XPRT_WRITE_SPACE, &xprt->state))
> 		return false;
> -	spin_lock_bh(&xprt->transport_lock);
> +	spin_lock(&xprt->transport_lock);
> 	ret = xprt_clear_write_space_locked(xprt);
> -	spin_unlock_bh(&xprt->transport_lock);
> +	spin_unlock(&xprt->transport_lock);
> 	return ret;
> }
> EXPORT_SYMBOL_GPL(xprt_write_space);
> @@ -633,9 +633,9 @@ int xprt_adjust_timeout(struct rpc_rqst *req)
> 		req->rq_retries = 0;
> 		xprt_reset_majortimeo(req);
> 		/* Reset the RTT counters == "slow start" */
> -		spin_lock_bh(&xprt->transport_lock);
> +		spin_lock(&xprt->transport_lock);
> 		rpc_init_rtt(req->rq_task->tk_client->cl_rtt, to->to_initval);
> -		spin_unlock_bh(&xprt->transport_lock);
> +		spin_unlock(&xprt->transport_lock);
> 		status = -ETIMEDOUT;
> 	}
> 
> @@ -667,11 +667,11 @@ static void xprt_autoclose(struct work_struct *work)
> void xprt_disconnect_done(struct rpc_xprt *xprt)
> {
> 	dprintk("RPC:       disconnected transport %p\n", xprt);
> -	spin_lock_bh(&xprt->transport_lock);
> +	spin_lock(&xprt->transport_lock);
> 	xprt_clear_connected(xprt);
> 	xprt_clear_write_space_locked(xprt);
> 	xprt_wake_pending_tasks(xprt, -ENOTCONN);
> -	spin_unlock_bh(&xprt->transport_lock);
> +	spin_unlock(&xprt->transport_lock);
> }
> EXPORT_SYMBOL_GPL(xprt_disconnect_done);
> 
> @@ -683,7 +683,7 @@ EXPORT_SYMBOL_GPL(xprt_disconnect_done);
> void xprt_force_disconnect(struct rpc_xprt *xprt)
> {
> 	/* Don't race with the test_bit() in xprt_clear_locked() */
> -	spin_lock_bh(&xprt->transport_lock);
> +	spin_lock(&xprt->transport_lock);
> 	set_bit(XPRT_CLOSE_WAIT, &xprt->state);
> 	/* Try to schedule an autoclose RPC call */
> 	if (test_and_set_bit(XPRT_LOCKED, &xprt->state) == 0)
> @@ -691,7 +691,7 @@ void xprt_force_disconnect(struct rpc_xprt *xprt)
> 	else if (xprt->snd_task)
> 		rpc_wake_up_queued_task_set_status(&xprt->pending,
> 				xprt->snd_task, -ENOTCONN);
> -	spin_unlock_bh(&xprt->transport_lock);
> +	spin_unlock(&xprt->transport_lock);
> }
> EXPORT_SYMBOL_GPL(xprt_force_disconnect);
> 
> @@ -725,7 +725,7 @@ xprt_request_retransmit_after_disconnect(struct rpc_task *task)
> void xprt_conditional_disconnect(struct rpc_xprt *xprt, unsigned int cookie)
> {
> 	/* Don't race with the test_bit() in xprt_clear_locked() */
> -	spin_lock_bh(&xprt->transport_lock);
> +	spin_lock(&xprt->transport_lock);
> 	if (cookie != xprt->connect_cookie)
> 		goto out;
> 	if (test_bit(XPRT_CLOSING, &xprt->state))
> @@ -736,7 +736,7 @@ void xprt_conditional_disconnect(struct rpc_xprt *xprt, unsigned int cookie)
> 		queue_work(xprtiod_workqueue, &xprt->task_cleanup);
> 	xprt_wake_pending_tasks(xprt, -EAGAIN);
> out:
> -	spin_unlock_bh(&xprt->transport_lock);
> +	spin_unlock(&xprt->transport_lock);
> }
> 
> static bool
> @@ -758,18 +758,13 @@ xprt_init_autodisconnect(struct timer_list *t)
> {
> 	struct rpc_xprt *xprt = from_timer(xprt, t, timer);
> 
> -	spin_lock(&xprt->transport_lock);
> 	if (!RB_EMPTY_ROOT(&xprt->recv_queue))
> -		goto out_abort;
> +		return;
> 	/* Reset xprt->last_used to avoid connect/autodisconnect cycling */
> 	xprt->last_used = jiffies;
> 	if (test_and_set_bit(XPRT_LOCKED, &xprt->state))
> -		goto out_abort;
> -	spin_unlock(&xprt->transport_lock);
> +		return;
> 	queue_work(xprtiod_workqueue, &xprt->task_cleanup);
> -	return;
> -out_abort:
> -	spin_unlock(&xprt->transport_lock);
> }
> 
> bool xprt_lock_connect(struct rpc_xprt *xprt,
> @@ -778,7 +773,7 @@ bool xprt_lock_connect(struct rpc_xprt *xprt,
> {
> 	bool ret = false;
> 
> -	spin_lock_bh(&xprt->transport_lock);
> +	spin_lock(&xprt->transport_lock);
> 	if (!test_bit(XPRT_LOCKED, &xprt->state))
> 		goto out;
> 	if (xprt->snd_task != task)
> @@ -786,13 +781,13 @@ bool xprt_lock_connect(struct rpc_xprt *xprt,
> 	xprt->snd_task = cookie;
> 	ret = true;
> out:
> -	spin_unlock_bh(&xprt->transport_lock);
> +	spin_unlock(&xprt->transport_lock);
> 	return ret;
> }
> 
> void xprt_unlock_connect(struct rpc_xprt *xprt, void *cookie)
> {
> -	spin_lock_bh(&xprt->transport_lock);
> +	spin_lock(&xprt->transport_lock);
> 	if (xprt->snd_task != cookie)
> 		goto out;
> 	if (!test_bit(XPRT_LOCKED, &xprt->state))
> @@ -801,7 +796,7 @@ void xprt_unlock_connect(struct rpc_xprt *xprt, void *cookie)
> 	xprt->ops->release_xprt(xprt, NULL);
> 	xprt_schedule_autodisconnect(xprt);
> out:
> -	spin_unlock_bh(&xprt->transport_lock);
> +	spin_unlock(&xprt->transport_lock);
> 	wake_up_bit(&xprt->state, XPRT_LOCKED);
> }
> 
> @@ -1411,14 +1406,14 @@ xprt_request_transmit(struct rpc_rqst *req, struct rpc_task *snd_task)
> 	xprt_inject_disconnect(xprt);
> 
> 	task->tk_flags |= RPC_TASK_SENT;
> -	spin_lock_bh(&xprt->transport_lock);
> +	spin_lock(&xprt->transport_lock);
> 
> 	xprt->stat.sends++;
> 	xprt->stat.req_u += xprt->stat.sends - xprt->stat.recvs;
> 	xprt->stat.bklog_u += xprt->backlog.qlen;
> 	xprt->stat.sending_u += xprt->sending.qlen;
> 	xprt->stat.pending_u += xprt->pending.qlen;
> -	spin_unlock_bh(&xprt->transport_lock);
> +	spin_unlock(&xprt->transport_lock);
> 
> 	req->rq_connect_cookie = connect_cookie;
> out_dequeue:
> @@ -1769,13 +1764,13 @@ void xprt_release(struct rpc_task *task)
> 	else if (task->tk_client)
> 		rpc_count_iostats(task, task->tk_client->cl_metrics);
> 	xprt_request_dequeue_all(task, req);
> -	spin_lock_bh(&xprt->transport_lock);
> +	spin_lock(&xprt->transport_lock);
> 	xprt->ops->release_xprt(xprt, task);
> 	if (xprt->ops->release_request)
> 		xprt->ops->release_request(task);
> 	xprt->last_used = jiffies;
> 	xprt_schedule_autodisconnect(xprt);
> -	spin_unlock_bh(&xprt->transport_lock);
> +	spin_unlock(&xprt->transport_lock);
> 	if (req->rq_buffer)
> 		xprt->ops->buf_free(task);
> 	xprt_inject_disconnect(xprt);
> diff --git a/net/sunrpc/xprtrdma/rpc_rdma.c b/net/sunrpc/xprtrdma/rpc_rdma.c
> index 6c1fb270f127..26419be782d0 100644
> --- a/net/sunrpc/xprtrdma/rpc_rdma.c
> +++ b/net/sunrpc/xprtrdma/rpc_rdma.c
> @@ -1359,10 +1359,10 @@ void rpcrdma_reply_handler(struct rpcrdma_rep *rep)
> 	else if (credits > buf->rb_max_requests)
> 		credits = buf->rb_max_requests;
> 	if (buf->rb_credits != credits) {
> -		spin_lock_bh(&xprt->transport_lock);
> +		spin_lock(&xprt->transport_lock);
> 		buf->rb_credits = credits;
> 		xprt->cwnd = credits << RPC_CWNDSHIFT;
> -		spin_unlock_bh(&xprt->transport_lock);
> +		spin_unlock(&xprt->transport_lock);
> 	}
> 
> 	req = rpcr_to_rdmar(rqst);
> diff --git a/net/sunrpc/xprtrdma/svc_rdma_backchannel.c b/net/sunrpc/xprtrdma/svc_rdma_backchannel.c
> index bed57d8b5c19..d1fcc41d5eb5 100644
> --- a/net/sunrpc/xprtrdma/svc_rdma_backchannel.c
> +++ b/net/sunrpc/xprtrdma/svc_rdma_backchannel.c
> @@ -72,9 +72,9 @@ int svc_rdma_handle_bc_reply(struct rpc_xprt *xprt, __be32 *rdma_resp,
> 	else if (credits > r_xprt->rx_buf.rb_bc_max_requests)
> 		credits = r_xprt->rx_buf.rb_bc_max_requests;
> 
> -	spin_lock_bh(&xprt->transport_lock);
> +	spin_lock(&xprt->transport_lock);
> 	xprt->cwnd = credits << RPC_CWNDSHIFT;
> -	spin_unlock_bh(&xprt->transport_lock);
> +	spin_unlock(&xprt->transport_lock);
> 
> 	spin_lock(&xprt->queue_lock);
> 	ret = 0;
> diff --git a/net/sunrpc/xprtrdma/svc_rdma_transport.c b/net/sunrpc/xprtrdma/svc_rdma_transport.c
> index 027a3b07d329..18ffc6190ea9 100644
> --- a/net/sunrpc/xprtrdma/svc_rdma_transport.c
> +++ b/net/sunrpc/xprtrdma/svc_rdma_transport.c
> @@ -221,9 +221,9 @@ static void handle_connect_req(struct rdma_cm_id *new_cma_id,
> 	 * Enqueue the new transport on the accept queue of the listening
> 	 * transport
> 	 */
> -	spin_lock_bh(&listen_xprt->sc_lock);
> +	spin_lock(&listen_xprt->sc_lock);
> 	list_add_tail(&newxprt->sc_accept_q, &listen_xprt->sc_accept_q);
> -	spin_unlock_bh(&listen_xprt->sc_lock);
> +	spin_unlock(&listen_xprt->sc_lock);
> 
> 	set_bit(XPT_CONN, &listen_xprt->sc_xprt.xpt_flags);
> 	svc_xprt_enqueue(&listen_xprt->sc_xprt);
> @@ -396,7 +396,7 @@ static struct svc_xprt *svc_rdma_accept(struct svc_xprt *xprt)
> 	listen_rdma = container_of(xprt, struct svcxprt_rdma, sc_xprt);
> 	clear_bit(XPT_CONN, &xprt->xpt_flags);
> 	/* Get the next entry off the accept list */
> -	spin_lock_bh(&listen_rdma->sc_lock);
> +	spin_lock(&listen_rdma->sc_lock);
> 	if (!list_empty(&listen_rdma->sc_accept_q)) {
> 		newxprt = list_entry(listen_rdma->sc_accept_q.next,
> 				     struct svcxprt_rdma, sc_accept_q);
> @@ -404,7 +404,7 @@ static struct svc_xprt *svc_rdma_accept(struct svc_xprt *xprt)
> 	}
> 	if (!list_empty(&listen_rdma->sc_accept_q))
> 		set_bit(XPT_CONN, &listen_rdma->sc_xprt.xpt_flags);
> -	spin_unlock_bh(&listen_rdma->sc_lock);
> +	spin_unlock(&listen_rdma->sc_lock);
> 	if (!newxprt)
> 		return NULL;
> 
> diff --git a/net/sunrpc/xprtsock.c b/net/sunrpc/xprtsock.c
> index e0195b1a0c18..d7b8e95a61c8 100644
> --- a/net/sunrpc/xprtsock.c
> +++ b/net/sunrpc/xprtsock.c
> @@ -880,7 +880,7 @@ static int xs_nospace(struct rpc_rqst *req)
> 			req->rq_slen);
> 
> 	/* Protect against races with write_space */
> -	spin_lock_bh(&xprt->transport_lock);
> +	spin_lock(&xprt->transport_lock);
> 
> 	/* Don't race with disconnect */
> 	if (xprt_connected(xprt)) {
> @@ -890,7 +890,7 @@ static int xs_nospace(struct rpc_rqst *req)
> 	} else
> 		ret = -ENOTCONN;
> 
> -	spin_unlock_bh(&xprt->transport_lock);
> +	spin_unlock(&xprt->transport_lock);
> 
> 	/* Race breaker in case memory is freed before above code is called */
> 	if (ret == -EAGAIN) {
> @@ -1344,6 +1344,7 @@ static void xs_destroy(struct rpc_xprt *xprt)
> 	cancel_delayed_work_sync(&transport->connect_worker);
> 	xs_close(xprt);
> 	cancel_work_sync(&transport->recv_worker);
> +	cancel_work_sync(&transport->error_worker);
> 	xs_xprt_free(xprt);
> 	module_put(THIS_MODULE);
> }
> @@ -1397,9 +1398,9 @@ static void xs_udp_data_read_skb(struct rpc_xprt *xprt,
> 	}
> 
> 
> -	spin_lock_bh(&xprt->transport_lock);
> +	spin_lock(&xprt->transport_lock);
> 	xprt_adjust_cwnd(xprt, task, copied);
> -	spin_unlock_bh(&xprt->transport_lock);
> +	spin_unlock(&xprt->transport_lock);
> 	spin_lock(&xprt->queue_lock);
> 	xprt_complete_rqst(task, copied);
> 	__UDPX_INC_STATS(sk, UDP_MIB_INDATAGRAMS);
> @@ -1509,7 +1510,6 @@ static void xs_tcp_state_change(struct sock *sk)
> 	trace_rpc_socket_state_change(xprt, sk->sk_socket);
> 	switch (sk->sk_state) {
> 	case TCP_ESTABLISHED:
> -		spin_lock(&xprt->transport_lock);
> 		if (!xprt_test_and_set_connected(xprt)) {
> 			xprt->connect_cookie++;
> 			clear_bit(XPRT_SOCK_CONNECTING, &transport->sock_state);
> @@ -1520,7 +1520,6 @@ static void xs_tcp_state_change(struct sock *sk)
> 						   xprt->stat.connect_start;
> 			xs_run_error_worker(transport, XPRT_SOCK_WAKE_PENDING);
> 		}
> -		spin_unlock(&xprt->transport_lock);
> 		break;
> 	case TCP_FIN_WAIT1:
> 		/* The client initiated a shutdown of the socket */
> @@ -1677,9 +1676,9 @@ static void xs_udp_set_buffer_size(struct rpc_xprt *xprt, size_t sndsize, size_t
>  */
> static void xs_udp_timer(struct rpc_xprt *xprt, struct rpc_task *task)
> {
> -	spin_lock_bh(&xprt->transport_lock);
> +	spin_lock(&xprt->transport_lock);
> 	xprt_adjust_cwnd(xprt, task, -ETIMEDOUT);
> -	spin_unlock_bh(&xprt->transport_lock);
> +	spin_unlock(&xprt->transport_lock);
> }
> 
> static int xs_get_random_port(void)
> @@ -2214,13 +2213,13 @@ static void xs_tcp_set_socket_timeouts(struct rpc_xprt *xprt,
> 	unsigned int opt_on = 1;
> 	unsigned int timeo;
> 
> -	spin_lock_bh(&xprt->transport_lock);
> +	spin_lock(&xprt->transport_lock);
> 	keepidle = DIV_ROUND_UP(xprt->timeout->to_initval, HZ);
> 	keepcnt = xprt->timeout->to_retries + 1;
> 	timeo = jiffies_to_msecs(xprt->timeout->to_initval) *
> 		(xprt->timeout->to_retries + 1);
> 	clear_bit(XPRT_SOCK_UPD_TIMEOUT, &transport->sock_state);
> -	spin_unlock_bh(&xprt->transport_lock);
> +	spin_unlock(&xprt->transport_lock);
> 
> 	/* TCP Keepalive options */
> 	kernel_setsockopt(sock, SOL_SOCKET, SO_KEEPALIVE,
> @@ -2245,7 +2244,7 @@ static void xs_tcp_set_connect_timeout(struct rpc_xprt *xprt,
> 	struct rpc_timeout to;
> 	unsigned long initval;
> 
> -	spin_lock_bh(&xprt->transport_lock);
> +	spin_lock(&xprt->transport_lock);
> 	if (reconnect_timeout < xprt->max_reconnect_timeout)
> 		xprt->max_reconnect_timeout = reconnect_timeout;
> 	if (connect_timeout < xprt->connect_timeout) {
> @@ -2262,7 +2261,7 @@ static void xs_tcp_set_connect_timeout(struct rpc_xprt *xprt,
> 		xprt->connect_timeout = connect_timeout;
> 	}
> 	set_bit(XPRT_SOCK_UPD_TIMEOUT, &transport->sock_state);
> -	spin_unlock_bh(&xprt->transport_lock);
> +	spin_unlock(&xprt->transport_lock);
> }
> 
> static int xs_tcp_finish_connecting(struct rpc_xprt *xprt, struct socket *sock)
> -- 
> 2.21.0
> 

--
Chuck Lever
Trond Myklebust May 3, 2019, 3:28 p.m. UTC | #2
On Fri, 2019-05-03 at 10:21 -0400, Chuck Lever wrote:
> > On May 3, 2019, at 7:18 AM, Trond Myklebust <trondmy@gmail.com>
> > wrote:
> > 
> > Signed-off-by: Trond Myklebust <trond.myklebust@hammerspace.com>
> > ---
> > net/sunrpc/xprt.c                          | 61 ++++++++++---------
> > ---
> > net/sunrpc/xprtrdma/rpc_rdma.c             |  4 +-
> > net/sunrpc/xprtrdma/svc_rdma_backchannel.c |  4 +-
> > net/sunrpc/xprtrdma/svc_rdma_transport.c   |  8 +--
> > net/sunrpc/xprtsock.c                      | 23 ++++----
> > 5 files changed, 47 insertions(+), 53 deletions(-)
> 
> For rpc_rdma.c and svc_rdma_backchannel.c:
> 
>    Reviewed-by: Chuck Lever <chuck.lever@oracle.com>
> 
> For svc_rdma_transport.c:
> 
> These locks are server-side only. AFAICS it's not safe
> to leave BH's enabled here. Can you drop these hunks?

Oops... Yes, I don't know why I mistook that for the xprt-
>transport_lock...

You mean these 3 hunks, right?

> > diff --git a/net/sunrpc/xprtrdma/svc_rdma_transport.c
> > b/net/sunrpc/xprtrdma/svc_rdma_transport.c
> > index 027a3b07d329..18ffc6190ea9 100644
> > --- a/net/sunrpc/xprtrdma/svc_rdma_transport.c
> > +++ b/net/sunrpc/xprtrdma/svc_rdma_transport.c
> > @@ -221,9 +221,9 @@ static void handle_connect_req(struct
> > rdma_cm_id *new_cma_id,
> > 	 * Enqueue the new transport on the accept queue of the
> > listening
> > 	 * transport
> > 	 */
> > -	spin_lock_bh(&listen_xprt->sc_lock);
> > +	spin_lock(&listen_xprt->sc_lock);
> > 	list_add_tail(&newxprt->sc_accept_q, &listen_xprt-
> > >sc_accept_q);
> > -	spin_unlock_bh(&listen_xprt->sc_lock);
> > +	spin_unlock(&listen_xprt->sc_lock);
> > 
> > 	set_bit(XPT_CONN, &listen_xprt->sc_xprt.xpt_flags);
> > 	svc_xprt_enqueue(&listen_xprt->sc_xprt);
> > @@ -396,7 +396,7 @@ static struct svc_xprt *svc_rdma_accept(struct
> > svc_xprt *xprt)
> > 	listen_rdma = container_of(xprt, struct svcxprt_rdma, sc_xprt);
> > 	clear_bit(XPT_CONN, &xprt->xpt_flags);
> > 	/* Get the next entry off the accept list */
> > -	spin_lock_bh(&listen_rdma->sc_lock);
> > +	spin_lock(&listen_rdma->sc_lock);
> > 	if (!list_empty(&listen_rdma->sc_accept_q)) {
> > 		newxprt = list_entry(listen_rdma->sc_accept_q.next,
> > 				     struct svcxprt_rdma, sc_accept_q);
> > @@ -404,7 +404,7 @@ static struct svc_xprt *svc_rdma_accept(struct
> > svc_xprt *xprt)
> > 	}
> > 	if (!list_empty(&listen_rdma->sc_accept_q))
> > 		set_bit(XPT_CONN, &listen_rdma->sc_xprt.xpt_flags);
> > -	spin_unlock_bh(&listen_rdma->sc_lock);
> > +	spin_unlock(&listen_rdma->sc_lock);
> > 	if (!newxprt)
> > 		return NULL;
> > 
> >
Chuck Lever III May 3, 2019, 3:43 p.m. UTC | #3
> On May 3, 2019, at 11:28 AM, Trond Myklebust <trondmy@gmail.com> wrote:
> 
> On Fri, 2019-05-03 at 10:21 -0400, Chuck Lever wrote:
>>> On May 3, 2019, at 7:18 AM, Trond Myklebust <trondmy@gmail.com>
>>> wrote:
>>> 
>>> Signed-off-by: Trond Myklebust <trond.myklebust@hammerspace.com>
>>> ---
>>> net/sunrpc/xprt.c                          | 61 ++++++++++---------
>>> ---
>>> net/sunrpc/xprtrdma/rpc_rdma.c             |  4 +-
>>> net/sunrpc/xprtrdma/svc_rdma_backchannel.c |  4 +-
>>> net/sunrpc/xprtrdma/svc_rdma_transport.c   |  8 +--
>>> net/sunrpc/xprtsock.c                      | 23 ++++----
>>> 5 files changed, 47 insertions(+), 53 deletions(-)
>> 
>> For rpc_rdma.c and svc_rdma_backchannel.c:
>> 
>>   Reviewed-by: Chuck Lever <chuck.lever@oracle.com>
>> 
>> For svc_rdma_transport.c:
>> 
>> These locks are server-side only. AFAICS it's not safe
>> to leave BH's enabled here. Can you drop these hunks?
> 
> Oops... Yes, I don't know why I mistook that for the xprt-
>> transport_lock...
> 
> You mean these 3 hunks, right?

Sí.

> 
>>> diff --git a/net/sunrpc/xprtrdma/svc_rdma_transport.c
>>> b/net/sunrpc/xprtrdma/svc_rdma_transport.c
>>> index 027a3b07d329..18ffc6190ea9 100644
>>> --- a/net/sunrpc/xprtrdma/svc_rdma_transport.c
>>> +++ b/net/sunrpc/xprtrdma/svc_rdma_transport.c
>>> @@ -221,9 +221,9 @@ static void handle_connect_req(struct
>>> rdma_cm_id *new_cma_id,
>>>     * Enqueue the new transport on the accept queue of the
>>> listening
>>>     * transport
>>>     */
>>> -    spin_lock_bh(&listen_xprt->sc_lock);
>>> +    spin_lock(&listen_xprt->sc_lock);
>>>    list_add_tail(&newxprt->sc_accept_q, &listen_xprt-
>>>> sc_accept_q);
>>> -    spin_unlock_bh(&listen_xprt->sc_lock);
>>> +    spin_unlock(&listen_xprt->sc_lock);
>>> 
>>>    set_bit(XPT_CONN, &listen_xprt->sc_xprt.xpt_flags);
>>>    svc_xprt_enqueue(&listen_xprt->sc_xprt);
>>> @@ -396,7 +396,7 @@ static struct svc_xprt *svc_rdma_accept(struct
>>> svc_xprt *xprt)
>>>    listen_rdma = container_of(xprt, struct svcxprt_rdma, sc_xprt);
>>>    clear_bit(XPT_CONN, &xprt->xpt_flags);
>>>    /* Get the next entry off the accept list */
>>> -    spin_lock_bh(&listen_rdma->sc_lock);
>>> +    spin_lock(&listen_rdma->sc_lock);
>>>    if (!list_empty(&listen_rdma->sc_accept_q)) {
>>>        newxprt = list_entry(listen_rdma->sc_accept_q.next,
>>>                     struct svcxprt_rdma, sc_accept_q);
>>> @@ -404,7 +404,7 @@ static struct svc_xprt *svc_rdma_accept(struct
>>> svc_xprt *xprt)
>>>    }
>>>    if (!list_empty(&listen_rdma->sc_accept_q))
>>>        set_bit(XPT_CONN, &listen_rdma->sc_xprt.xpt_flags);
>>> -    spin_unlock_bh(&listen_rdma->sc_lock);
>>> +    spin_unlock(&listen_rdma->sc_lock);
>>>    if (!newxprt)
>>>        return NULL;
>>> 
>>> 
> 
> -- 
> Trond Myklebust
> Linux NFS client maintainer, Hammerspace
> trond.myklebust@hammerspace.com
> 
> 
>
diff mbox series

Patch

diff --git a/net/sunrpc/xprt.c b/net/sunrpc/xprt.c
index bc1c8247750d..b87d185cf010 100644
--- a/net/sunrpc/xprt.c
+++ b/net/sunrpc/xprt.c
@@ -301,9 +301,9 @@  static inline int xprt_lock_write(struct rpc_xprt *xprt, struct rpc_task *task)
 
 	if (test_bit(XPRT_LOCKED, &xprt->state) && xprt->snd_task == task)
 		return 1;
-	spin_lock_bh(&xprt->transport_lock);
+	spin_lock(&xprt->transport_lock);
 	retval = xprt->ops->reserve_xprt(xprt, task);
-	spin_unlock_bh(&xprt->transport_lock);
+	spin_unlock(&xprt->transport_lock);
 	return retval;
 }
 
@@ -380,9 +380,9 @@  static inline void xprt_release_write(struct rpc_xprt *xprt, struct rpc_task *ta
 {
 	if (xprt->snd_task != task)
 		return;
-	spin_lock_bh(&xprt->transport_lock);
+	spin_lock(&xprt->transport_lock);
 	xprt->ops->release_xprt(xprt, task);
-	spin_unlock_bh(&xprt->transport_lock);
+	spin_unlock(&xprt->transport_lock);
 }
 
 /*
@@ -434,9 +434,9 @@  xprt_request_get_cong(struct rpc_xprt *xprt, struct rpc_rqst *req)
 
 	if (req->rq_cong)
 		return true;
-	spin_lock_bh(&xprt->transport_lock);
+	spin_lock(&xprt->transport_lock);
 	ret = __xprt_get_cong(xprt, req) != 0;
-	spin_unlock_bh(&xprt->transport_lock);
+	spin_unlock(&xprt->transport_lock);
 	return ret;
 }
 EXPORT_SYMBOL_GPL(xprt_request_get_cong);
@@ -463,9 +463,9 @@  static void
 xprt_clear_congestion_window_wait(struct rpc_xprt *xprt)
 {
 	if (test_and_clear_bit(XPRT_CWND_WAIT, &xprt->state)) {
-		spin_lock_bh(&xprt->transport_lock);
+		spin_lock(&xprt->transport_lock);
 		__xprt_lock_write_next_cong(xprt);
-		spin_unlock_bh(&xprt->transport_lock);
+		spin_unlock(&xprt->transport_lock);
 	}
 }
 
@@ -562,9 +562,9 @@  bool xprt_write_space(struct rpc_xprt *xprt)
 
 	if (!test_bit(XPRT_WRITE_SPACE, &xprt->state))
 		return false;
-	spin_lock_bh(&xprt->transport_lock);
+	spin_lock(&xprt->transport_lock);
 	ret = xprt_clear_write_space_locked(xprt);
-	spin_unlock_bh(&xprt->transport_lock);
+	spin_unlock(&xprt->transport_lock);
 	return ret;
 }
 EXPORT_SYMBOL_GPL(xprt_write_space);
@@ -633,9 +633,9 @@  int xprt_adjust_timeout(struct rpc_rqst *req)
 		req->rq_retries = 0;
 		xprt_reset_majortimeo(req);
 		/* Reset the RTT counters == "slow start" */
-		spin_lock_bh(&xprt->transport_lock);
+		spin_lock(&xprt->transport_lock);
 		rpc_init_rtt(req->rq_task->tk_client->cl_rtt, to->to_initval);
-		spin_unlock_bh(&xprt->transport_lock);
+		spin_unlock(&xprt->transport_lock);
 		status = -ETIMEDOUT;
 	}
 
@@ -667,11 +667,11 @@  static void xprt_autoclose(struct work_struct *work)
 void xprt_disconnect_done(struct rpc_xprt *xprt)
 {
 	dprintk("RPC:       disconnected transport %p\n", xprt);
-	spin_lock_bh(&xprt->transport_lock);
+	spin_lock(&xprt->transport_lock);
 	xprt_clear_connected(xprt);
 	xprt_clear_write_space_locked(xprt);
 	xprt_wake_pending_tasks(xprt, -ENOTCONN);
-	spin_unlock_bh(&xprt->transport_lock);
+	spin_unlock(&xprt->transport_lock);
 }
 EXPORT_SYMBOL_GPL(xprt_disconnect_done);
 
@@ -683,7 +683,7 @@  EXPORT_SYMBOL_GPL(xprt_disconnect_done);
 void xprt_force_disconnect(struct rpc_xprt *xprt)
 {
 	/* Don't race with the test_bit() in xprt_clear_locked() */
-	spin_lock_bh(&xprt->transport_lock);
+	spin_lock(&xprt->transport_lock);
 	set_bit(XPRT_CLOSE_WAIT, &xprt->state);
 	/* Try to schedule an autoclose RPC call */
 	if (test_and_set_bit(XPRT_LOCKED, &xprt->state) == 0)
@@ -691,7 +691,7 @@  void xprt_force_disconnect(struct rpc_xprt *xprt)
 	else if (xprt->snd_task)
 		rpc_wake_up_queued_task_set_status(&xprt->pending,
 				xprt->snd_task, -ENOTCONN);
-	spin_unlock_bh(&xprt->transport_lock);
+	spin_unlock(&xprt->transport_lock);
 }
 EXPORT_SYMBOL_GPL(xprt_force_disconnect);
 
@@ -725,7 +725,7 @@  xprt_request_retransmit_after_disconnect(struct rpc_task *task)
 void xprt_conditional_disconnect(struct rpc_xprt *xprt, unsigned int cookie)
 {
 	/* Don't race with the test_bit() in xprt_clear_locked() */
-	spin_lock_bh(&xprt->transport_lock);
+	spin_lock(&xprt->transport_lock);
 	if (cookie != xprt->connect_cookie)
 		goto out;
 	if (test_bit(XPRT_CLOSING, &xprt->state))
@@ -736,7 +736,7 @@  void xprt_conditional_disconnect(struct rpc_xprt *xprt, unsigned int cookie)
 		queue_work(xprtiod_workqueue, &xprt->task_cleanup);
 	xprt_wake_pending_tasks(xprt, -EAGAIN);
 out:
-	spin_unlock_bh(&xprt->transport_lock);
+	spin_unlock(&xprt->transport_lock);
 }
 
 static bool
@@ -758,18 +758,13 @@  xprt_init_autodisconnect(struct timer_list *t)
 {
 	struct rpc_xprt *xprt = from_timer(xprt, t, timer);
 
-	spin_lock(&xprt->transport_lock);
 	if (!RB_EMPTY_ROOT(&xprt->recv_queue))
-		goto out_abort;
+		return;
 	/* Reset xprt->last_used to avoid connect/autodisconnect cycling */
 	xprt->last_used = jiffies;
 	if (test_and_set_bit(XPRT_LOCKED, &xprt->state))
-		goto out_abort;
-	spin_unlock(&xprt->transport_lock);
+		return;
 	queue_work(xprtiod_workqueue, &xprt->task_cleanup);
-	return;
-out_abort:
-	spin_unlock(&xprt->transport_lock);
 }
 
 bool xprt_lock_connect(struct rpc_xprt *xprt,
@@ -778,7 +773,7 @@  bool xprt_lock_connect(struct rpc_xprt *xprt,
 {
 	bool ret = false;
 
-	spin_lock_bh(&xprt->transport_lock);
+	spin_lock(&xprt->transport_lock);
 	if (!test_bit(XPRT_LOCKED, &xprt->state))
 		goto out;
 	if (xprt->snd_task != task)
@@ -786,13 +781,13 @@  bool xprt_lock_connect(struct rpc_xprt *xprt,
 	xprt->snd_task = cookie;
 	ret = true;
 out:
-	spin_unlock_bh(&xprt->transport_lock);
+	spin_unlock(&xprt->transport_lock);
 	return ret;
 }
 
 void xprt_unlock_connect(struct rpc_xprt *xprt, void *cookie)
 {
-	spin_lock_bh(&xprt->transport_lock);
+	spin_lock(&xprt->transport_lock);
 	if (xprt->snd_task != cookie)
 		goto out;
 	if (!test_bit(XPRT_LOCKED, &xprt->state))
@@ -801,7 +796,7 @@  void xprt_unlock_connect(struct rpc_xprt *xprt, void *cookie)
 	xprt->ops->release_xprt(xprt, NULL);
 	xprt_schedule_autodisconnect(xprt);
 out:
-	spin_unlock_bh(&xprt->transport_lock);
+	spin_unlock(&xprt->transport_lock);
 	wake_up_bit(&xprt->state, XPRT_LOCKED);
 }
 
@@ -1411,14 +1406,14 @@  xprt_request_transmit(struct rpc_rqst *req, struct rpc_task *snd_task)
 	xprt_inject_disconnect(xprt);
 
 	task->tk_flags |= RPC_TASK_SENT;
-	spin_lock_bh(&xprt->transport_lock);
+	spin_lock(&xprt->transport_lock);
 
 	xprt->stat.sends++;
 	xprt->stat.req_u += xprt->stat.sends - xprt->stat.recvs;
 	xprt->stat.bklog_u += xprt->backlog.qlen;
 	xprt->stat.sending_u += xprt->sending.qlen;
 	xprt->stat.pending_u += xprt->pending.qlen;
-	spin_unlock_bh(&xprt->transport_lock);
+	spin_unlock(&xprt->transport_lock);
 
 	req->rq_connect_cookie = connect_cookie;
 out_dequeue:
@@ -1769,13 +1764,13 @@  void xprt_release(struct rpc_task *task)
 	else if (task->tk_client)
 		rpc_count_iostats(task, task->tk_client->cl_metrics);
 	xprt_request_dequeue_all(task, req);
-	spin_lock_bh(&xprt->transport_lock);
+	spin_lock(&xprt->transport_lock);
 	xprt->ops->release_xprt(xprt, task);
 	if (xprt->ops->release_request)
 		xprt->ops->release_request(task);
 	xprt->last_used = jiffies;
 	xprt_schedule_autodisconnect(xprt);
-	spin_unlock_bh(&xprt->transport_lock);
+	spin_unlock(&xprt->transport_lock);
 	if (req->rq_buffer)
 		xprt->ops->buf_free(task);
 	xprt_inject_disconnect(xprt);
diff --git a/net/sunrpc/xprtrdma/rpc_rdma.c b/net/sunrpc/xprtrdma/rpc_rdma.c
index 6c1fb270f127..26419be782d0 100644
--- a/net/sunrpc/xprtrdma/rpc_rdma.c
+++ b/net/sunrpc/xprtrdma/rpc_rdma.c
@@ -1359,10 +1359,10 @@  void rpcrdma_reply_handler(struct rpcrdma_rep *rep)
 	else if (credits > buf->rb_max_requests)
 		credits = buf->rb_max_requests;
 	if (buf->rb_credits != credits) {
-		spin_lock_bh(&xprt->transport_lock);
+		spin_lock(&xprt->transport_lock);
 		buf->rb_credits = credits;
 		xprt->cwnd = credits << RPC_CWNDSHIFT;
-		spin_unlock_bh(&xprt->transport_lock);
+		spin_unlock(&xprt->transport_lock);
 	}
 
 	req = rpcr_to_rdmar(rqst);
diff --git a/net/sunrpc/xprtrdma/svc_rdma_backchannel.c b/net/sunrpc/xprtrdma/svc_rdma_backchannel.c
index bed57d8b5c19..d1fcc41d5eb5 100644
--- a/net/sunrpc/xprtrdma/svc_rdma_backchannel.c
+++ b/net/sunrpc/xprtrdma/svc_rdma_backchannel.c
@@ -72,9 +72,9 @@  int svc_rdma_handle_bc_reply(struct rpc_xprt *xprt, __be32 *rdma_resp,
 	else if (credits > r_xprt->rx_buf.rb_bc_max_requests)
 		credits = r_xprt->rx_buf.rb_bc_max_requests;
 
-	spin_lock_bh(&xprt->transport_lock);
+	spin_lock(&xprt->transport_lock);
 	xprt->cwnd = credits << RPC_CWNDSHIFT;
-	spin_unlock_bh(&xprt->transport_lock);
+	spin_unlock(&xprt->transport_lock);
 
 	spin_lock(&xprt->queue_lock);
 	ret = 0;
diff --git a/net/sunrpc/xprtrdma/svc_rdma_transport.c b/net/sunrpc/xprtrdma/svc_rdma_transport.c
index 027a3b07d329..18ffc6190ea9 100644
--- a/net/sunrpc/xprtrdma/svc_rdma_transport.c
+++ b/net/sunrpc/xprtrdma/svc_rdma_transport.c
@@ -221,9 +221,9 @@  static void handle_connect_req(struct rdma_cm_id *new_cma_id,
 	 * Enqueue the new transport on the accept queue of the listening
 	 * transport
 	 */
-	spin_lock_bh(&listen_xprt->sc_lock);
+	spin_lock(&listen_xprt->sc_lock);
 	list_add_tail(&newxprt->sc_accept_q, &listen_xprt->sc_accept_q);
-	spin_unlock_bh(&listen_xprt->sc_lock);
+	spin_unlock(&listen_xprt->sc_lock);
 
 	set_bit(XPT_CONN, &listen_xprt->sc_xprt.xpt_flags);
 	svc_xprt_enqueue(&listen_xprt->sc_xprt);
@@ -396,7 +396,7 @@  static struct svc_xprt *svc_rdma_accept(struct svc_xprt *xprt)
 	listen_rdma = container_of(xprt, struct svcxprt_rdma, sc_xprt);
 	clear_bit(XPT_CONN, &xprt->xpt_flags);
 	/* Get the next entry off the accept list */
-	spin_lock_bh(&listen_rdma->sc_lock);
+	spin_lock(&listen_rdma->sc_lock);
 	if (!list_empty(&listen_rdma->sc_accept_q)) {
 		newxprt = list_entry(listen_rdma->sc_accept_q.next,
 				     struct svcxprt_rdma, sc_accept_q);
@@ -404,7 +404,7 @@  static struct svc_xprt *svc_rdma_accept(struct svc_xprt *xprt)
 	}
 	if (!list_empty(&listen_rdma->sc_accept_q))
 		set_bit(XPT_CONN, &listen_rdma->sc_xprt.xpt_flags);
-	spin_unlock_bh(&listen_rdma->sc_lock);
+	spin_unlock(&listen_rdma->sc_lock);
 	if (!newxprt)
 		return NULL;
 
diff --git a/net/sunrpc/xprtsock.c b/net/sunrpc/xprtsock.c
index e0195b1a0c18..d7b8e95a61c8 100644
--- a/net/sunrpc/xprtsock.c
+++ b/net/sunrpc/xprtsock.c
@@ -880,7 +880,7 @@  static int xs_nospace(struct rpc_rqst *req)
 			req->rq_slen);
 
 	/* Protect against races with write_space */
-	spin_lock_bh(&xprt->transport_lock);
+	spin_lock(&xprt->transport_lock);
 
 	/* Don't race with disconnect */
 	if (xprt_connected(xprt)) {
@@ -890,7 +890,7 @@  static int xs_nospace(struct rpc_rqst *req)
 	} else
 		ret = -ENOTCONN;
 
-	spin_unlock_bh(&xprt->transport_lock);
+	spin_unlock(&xprt->transport_lock);
 
 	/* Race breaker in case memory is freed before above code is called */
 	if (ret == -EAGAIN) {
@@ -1344,6 +1344,7 @@  static void xs_destroy(struct rpc_xprt *xprt)
 	cancel_delayed_work_sync(&transport->connect_worker);
 	xs_close(xprt);
 	cancel_work_sync(&transport->recv_worker);
+	cancel_work_sync(&transport->error_worker);
 	xs_xprt_free(xprt);
 	module_put(THIS_MODULE);
 }
@@ -1397,9 +1398,9 @@  static void xs_udp_data_read_skb(struct rpc_xprt *xprt,
 	}
 
 
-	spin_lock_bh(&xprt->transport_lock);
+	spin_lock(&xprt->transport_lock);
 	xprt_adjust_cwnd(xprt, task, copied);
-	spin_unlock_bh(&xprt->transport_lock);
+	spin_unlock(&xprt->transport_lock);
 	spin_lock(&xprt->queue_lock);
 	xprt_complete_rqst(task, copied);
 	__UDPX_INC_STATS(sk, UDP_MIB_INDATAGRAMS);
@@ -1509,7 +1510,6 @@  static void xs_tcp_state_change(struct sock *sk)
 	trace_rpc_socket_state_change(xprt, sk->sk_socket);
 	switch (sk->sk_state) {
 	case TCP_ESTABLISHED:
-		spin_lock(&xprt->transport_lock);
 		if (!xprt_test_and_set_connected(xprt)) {
 			xprt->connect_cookie++;
 			clear_bit(XPRT_SOCK_CONNECTING, &transport->sock_state);
@@ -1520,7 +1520,6 @@  static void xs_tcp_state_change(struct sock *sk)
 						   xprt->stat.connect_start;
 			xs_run_error_worker(transport, XPRT_SOCK_WAKE_PENDING);
 		}
-		spin_unlock(&xprt->transport_lock);
 		break;
 	case TCP_FIN_WAIT1:
 		/* The client initiated a shutdown of the socket */
@@ -1677,9 +1676,9 @@  static void xs_udp_set_buffer_size(struct rpc_xprt *xprt, size_t sndsize, size_t
  */
 static void xs_udp_timer(struct rpc_xprt *xprt, struct rpc_task *task)
 {
-	spin_lock_bh(&xprt->transport_lock);
+	spin_lock(&xprt->transport_lock);
 	xprt_adjust_cwnd(xprt, task, -ETIMEDOUT);
-	spin_unlock_bh(&xprt->transport_lock);
+	spin_unlock(&xprt->transport_lock);
 }
 
 static int xs_get_random_port(void)
@@ -2214,13 +2213,13 @@  static void xs_tcp_set_socket_timeouts(struct rpc_xprt *xprt,
 	unsigned int opt_on = 1;
 	unsigned int timeo;
 
-	spin_lock_bh(&xprt->transport_lock);
+	spin_lock(&xprt->transport_lock);
 	keepidle = DIV_ROUND_UP(xprt->timeout->to_initval, HZ);
 	keepcnt = xprt->timeout->to_retries + 1;
 	timeo = jiffies_to_msecs(xprt->timeout->to_initval) *
 		(xprt->timeout->to_retries + 1);
 	clear_bit(XPRT_SOCK_UPD_TIMEOUT, &transport->sock_state);
-	spin_unlock_bh(&xprt->transport_lock);
+	spin_unlock(&xprt->transport_lock);
 
 	/* TCP Keepalive options */
 	kernel_setsockopt(sock, SOL_SOCKET, SO_KEEPALIVE,
@@ -2245,7 +2244,7 @@  static void xs_tcp_set_connect_timeout(struct rpc_xprt *xprt,
 	struct rpc_timeout to;
 	unsigned long initval;
 
-	spin_lock_bh(&xprt->transport_lock);
+	spin_lock(&xprt->transport_lock);
 	if (reconnect_timeout < xprt->max_reconnect_timeout)
 		xprt->max_reconnect_timeout = reconnect_timeout;
 	if (connect_timeout < xprt->connect_timeout) {
@@ -2262,7 +2261,7 @@  static void xs_tcp_set_connect_timeout(struct rpc_xprt *xprt,
 		xprt->connect_timeout = connect_timeout;
 	}
 	set_bit(XPRT_SOCK_UPD_TIMEOUT, &transport->sock_state);
-	spin_unlock_bh(&xprt->transport_lock);
+	spin_unlock(&xprt->transport_lock);
 }
 
 static int xs_tcp_finish_connecting(struct rpc_xprt *xprt, struct socket *sock)