Message ID | 20140623224023.1634.67233.stgit@manet.1015granger.net (mailing list archive) |
---|---|
State | Not Applicable |
Headers | show |
Would it be possible to delay rpcrdma_buffer_put() until LOCAL_INV request send completion? remove rpcrdma_buffer_put() from xprt_rdma_free(), add a call back after LOCAL_INV completed? Shirley On 06/23/2014 03:40 PM, Chuck Lever wrote: > FRMR uses a LOCAL_INV Work Request, which is asynchronous, to > deregister segment buffers. Other registration strategies use > synchronous deregistration mechanisms (like ib_unmap_fmr()). > > For a synchronous deregistration mechanism, it makes sense for > xprt_rdma_free() to put segment buffers back into the buffer pool > immediately once rpcrdma_deregister_external() returns. > > This is currently also what FRMR is doing. It is releasing segment > buffers just after the LOCAL_INV WR is posted. > > But segment buffers need to be put back after the LOCAL_INV WR > _completes_ (or flushes). Otherwise, rpcrdma_buffer_get() can then > assign these segment buffers to another RPC task while they are > still "in use" by the hardware. > > The result of re-using an FRMR too quickly is that it's rkey > no longer matches the rkey that was registered with the provider. > This results in FAST_REG_MR or LOCAL_INV Work Requests completing > with IB_WC_MW_BIND_ERR, and the FRMR, and thus the transport, > becomes unusable. > > Signed-off-by: Chuck Lever <chuck.lever@oracle.com> > --- > net/sunrpc/xprtrdma/verbs.c | 44 +++++++++++++++++++++++++++++++++++---- > net/sunrpc/xprtrdma/xprt_rdma.h | 2 ++ > 2 files changed, 42 insertions(+), 4 deletions(-) > > diff --git a/net/sunrpc/xprtrdma/verbs.c b/net/sunrpc/xprtrdma/verbs.c > index f24f0bf..52f57f7 100644 > --- a/net/sunrpc/xprtrdma/verbs.c > +++ b/net/sunrpc/xprtrdma/verbs.c > @@ -62,6 +62,8 @@ > #endif > > static void rpcrdma_decrement_frmr_rkey(struct rpcrdma_mw *); > +static void rpcrdma_get_mw(struct rpcrdma_mw *); > +static void rpcrdma_put_mw(struct rpcrdma_mw *); > > /* > * internal functions > @@ -167,6 +169,7 @@ rpcrdma_sendcq_process_wc(struct ib_wc *wc) > if (fastreg) > rpcrdma_decrement_frmr_rkey(mw); > } > + rpcrdma_put_mw(mw); > } > > static int > @@ -1034,7 +1037,7 @@ rpcrdma_buffer_create(struct rpcrdma_buffer *buf, struct rpcrdma_ep *ep, > len += cdata->padding; > switch (ia->ri_memreg_strategy) { > case RPCRDMA_FRMR: > - len += buf->rb_max_requests * RPCRDMA_MAX_SEGS * > + len += (buf->rb_max_requests + 1) * RPCRDMA_MAX_SEGS * > sizeof(struct rpcrdma_mw); > break; > case RPCRDMA_MTHCAFMR: > @@ -1076,7 +1079,7 @@ rpcrdma_buffer_create(struct rpcrdma_buffer *buf, struct rpcrdma_ep *ep, > r = (struct rpcrdma_mw *)p; > switch (ia->ri_memreg_strategy) { > case RPCRDMA_FRMR: > - for (i = buf->rb_max_requests * RPCRDMA_MAX_SEGS; i; i--) { > + for (i = (buf->rb_max_requests+1) * RPCRDMA_MAX_SEGS; i; i--) { > r->r.frmr.fr_mr = ib_alloc_fast_reg_mr(ia->ri_pd, > ia->ri_max_frmr_depth); > if (IS_ERR(r->r.frmr.fr_mr)) { > @@ -1252,12 +1255,36 @@ rpcrdma_buffer_destroy(struct rpcrdma_buffer *buf) > } > > static void > -rpcrdma_put_mw_locked(struct rpcrdma_mw *mw) > +rpcrdma_free_mw(struct kref *kref) > { > + struct rpcrdma_mw *mw = container_of(kref, struct rpcrdma_mw, mw_ref); > list_add_tail(&mw->mw_list, &mw->mw_pool->rb_mws); > } > > static void > +rpcrdma_put_mw_locked(struct rpcrdma_mw *mw) > +{ > + kref_put(&mw->mw_ref, rpcrdma_free_mw); > +} > + > +static void > +rpcrdma_get_mw(struct rpcrdma_mw *mw) > +{ > + kref_get(&mw->mw_ref); > +} > + > +static void > +rpcrdma_put_mw(struct rpcrdma_mw *mw) > +{ > + struct rpcrdma_buffer *buffers = mw->mw_pool; > + unsigned long flags; > + > + spin_lock_irqsave(&buffers->rb_lock, flags); > + rpcrdma_put_mw_locked(mw); > + spin_unlock_irqrestore(&buffers->rb_lock, flags); > +} > + > +static void > rpcrdma_buffer_put_mw(struct rpcrdma_mw **mw) > { > rpcrdma_put_mw_locked(*mw); > @@ -1304,6 +1331,7 @@ rpcrdma_buffer_get_mws(struct rpcrdma_req *req, struct rpcrdma_buffer *buffers) > r = list_entry(buffers->rb_mws.next, > struct rpcrdma_mw, mw_list); > list_del(&r->mw_list); > + kref_init(&r->mw_ref); > r->mw_pool = buffers; > req->rl_segments[i].mr_chunk.rl_mw = r; > } > @@ -1583,6 +1611,7 @@ rpcrdma_register_frmr_external(struct rpcrdma_mr_seg *seg, > dprintk("RPC: %s: Using frmr %p to map %d segments\n", > __func__, seg1->mr_chunk.rl_mw, i); > > + rpcrdma_get_mw(seg1->mr_chunk.rl_mw); > if (unlikely(seg1->mr_chunk.rl_mw->r.frmr.fr_state == FRMR_IS_VALID)) { > dprintk("RPC: %s: frmr %x left valid, posting invalidate.\n", > __func__, > @@ -1595,6 +1624,7 @@ rpcrdma_register_frmr_external(struct rpcrdma_mr_seg *seg, > invalidate_wr.send_flags = IB_SEND_SIGNALED; > invalidate_wr.ex.invalidate_rkey = > seg1->mr_chunk.rl_mw->r.frmr.fr_mr->rkey; > + rpcrdma_get_mw(seg1->mr_chunk.rl_mw); > DECR_CQCOUNT(&r_xprt->rx_ep); > post_wr = &invalidate_wr; > } else > @@ -1638,6 +1668,9 @@ rpcrdma_register_frmr_external(struct rpcrdma_mr_seg *seg, > *nsegs = i; > return 0; > out_err: > + rpcrdma_put_mw(seg1->mr_chunk.rl_mw); > + if (post_wr == &invalidate_wr) > + rpcrdma_put_mw(seg1->mr_chunk.rl_mw); > while (i--) > rpcrdma_unmap_one(ia, --seg); > return rc; > @@ -1653,6 +1686,7 @@ rpcrdma_deregister_frmr_external(struct rpcrdma_mr_seg *seg, > > while (seg1->mr_nsegs--) > rpcrdma_unmap_one(ia, seg++); > + rpcrdma_get_mw(seg1->mr_chunk.rl_mw); > > memset(&invalidate_wr, 0, sizeof invalidate_wr); > invalidate_wr.wr_id = (unsigned long)(void *)seg1->mr_chunk.rl_mw; > @@ -1664,9 +1698,11 @@ rpcrdma_deregister_frmr_external(struct rpcrdma_mr_seg *seg, > read_lock(&ia->ri_qplock); > rc = ib_post_send(ia->ri_id->qp, &invalidate_wr, &bad_wr); > read_unlock(&ia->ri_qplock); > - if (rc) > + if (rc) { > + rpcrdma_put_mw(seg1->mr_chunk.rl_mw); > dprintk("RPC: %s: failed ib_post_send for invalidate," > " status %i\n", __func__, rc); > + } > return rc; > } > > diff --git a/net/sunrpc/xprtrdma/xprt_rdma.h b/net/sunrpc/xprtrdma/xprt_rdma.h > index b81e5b5..7a140fe 100644 > --- a/net/sunrpc/xprtrdma/xprt_rdma.h > +++ b/net/sunrpc/xprtrdma/xprt_rdma.h > @@ -44,6 +44,7 @@ > #include <linux/spinlock.h> /* spinlock_t, etc */ > #include <linux/atomic.h> /* atomic_t, etc */ > #include <linux/workqueue.h> /* struct work_struct */ > +#include <linux/kref.h> > > #include <rdma/rdma_cm.h> /* RDMA connection api */ > #include <rdma/ib_verbs.h> /* RDMA verbs api */ > @@ -176,6 +177,7 @@ struct rpcrdma_mw { > } r; > struct list_head mw_list; > struct rpcrdma_buffer *mw_pool; > + struct kref mw_ref; > }; > > #define RPCRDMA_BIT_FASTREG (0) > > -- > To unsubscribe from this list: send the line "unsubscribe linux-nfs" in > the body of a message to majordomo@vger.kernel.org > More majordomo info at http://vger.kernel.org/majordomo-info.html > -- To unsubscribe from this list: send the line "unsubscribe linux-rdma" in the body of a message to majordomo@vger.kernel.org More majordomo info at http://vger.kernel.org/majordomo-info.html
Hi Shirley-
On Jun 25, 2014, at 1:17 AM, Shirley Ma <shirley.ma@oracle.com> wrote:
> Would it be possible to delay rpcrdma_buffer_put() until LOCAL_INV request send completion? remove rpcrdma_buffer_put() from xprt_rdma_free(), add a call back after LOCAL_INV completed?
That’s exactly what this patch does. The relevant part of
rpcrdma_buffer_put() is:
list_add(&mw->mw_list, &buf->rb_mws);
This is now wrapped with a reference count so that
rpcrdma_buffer_put() and the LOCAL_INV completion can run in any
order. The FRMR is added back to the list only after both of those
two have finished.
Nothing in xprt_rdma_free() is allowed to sleep, so we can’t wait for
LOCAL_INV completion in there.
The only alternative I can think of is having rpcrdma_buffer_get() check
fr_state as it removes FRMRs from the rb_mws list. Only if the FRMR is
marked FRMR_IS_INVALID, rpcrdma_buffer_get() will add it to the
rpcrdma_req.
--
Chuck Lever
chuck[dot]lever[at]oracle[dot]com
--
To unsubscribe from this list: send the line "unsubscribe linux-rdma" in
the body of a message to majordomo@vger.kernel.org
More majordomo info at http://vger.kernel.org/majordomo-info.html
On 06/25/2014 07:32 AM, Chuck Lever wrote: > Hi Shirley- > > On Jun 25, 2014, at 1:17 AM, Shirley Ma <shirley.ma@oracle.com> wrote: > >> Would it be possible to delay rpcrdma_buffer_put() until LOCAL_INV request send completion? remove rpcrdma_buffer_put() from xprt_rdma_free(), add a call back after LOCAL_INV completed? > > That’s exactly what this patch does. The relevant part of > rpcrdma_buffer_put() is: > > list_add(&mw->mw_list, &buf->rb_mws); > > This is now wrapped with a reference count so that > rpcrdma_buffer_put() and the LOCAL_INV completion can run in any > order. The FRMR is added back to the list only after both of those > two have finished. What I was thinking is to run rpcrdma_buffer_put after LOCAL_INV completion without reference count. > Nothing in xprt_rdma_free() is allowed to sleep, so we can’t wait for > LOCAL_INV completion in there. > > The only alternative I can think of is having rpcrdma_buffer_get() check > fr_state as it removes FRMRs from the rb_mws list. Only if the FRMR is > marked FRMR_IS_INVALID, rpcrdma_buffer_get() will add it to the > rpcrdma_req. I thought about it too, an atomic operation would be better than a lock. > -- > Chuck Lever > chuck[dot]lever[at]oracle[dot]com > > > -- To unsubscribe from this list: send the line "unsubscribe linux-rdma" in the body of a message to majordomo@vger.kernel.org More majordomo info at http://vger.kernel.org/majordomo-info.html
diff --git a/net/sunrpc/xprtrdma/verbs.c b/net/sunrpc/xprtrdma/verbs.c index f24f0bf..52f57f7 100644 --- a/net/sunrpc/xprtrdma/verbs.c +++ b/net/sunrpc/xprtrdma/verbs.c @@ -62,6 +62,8 @@ #endif static void rpcrdma_decrement_frmr_rkey(struct rpcrdma_mw *); +static void rpcrdma_get_mw(struct rpcrdma_mw *); +static void rpcrdma_put_mw(struct rpcrdma_mw *); /* * internal functions @@ -167,6 +169,7 @@ rpcrdma_sendcq_process_wc(struct ib_wc *wc) if (fastreg) rpcrdma_decrement_frmr_rkey(mw); } + rpcrdma_put_mw(mw); } static int @@ -1034,7 +1037,7 @@ rpcrdma_buffer_create(struct rpcrdma_buffer *buf, struct rpcrdma_ep *ep, len += cdata->padding; switch (ia->ri_memreg_strategy) { case RPCRDMA_FRMR: - len += buf->rb_max_requests * RPCRDMA_MAX_SEGS * + len += (buf->rb_max_requests + 1) * RPCRDMA_MAX_SEGS * sizeof(struct rpcrdma_mw); break; case RPCRDMA_MTHCAFMR: @@ -1076,7 +1079,7 @@ rpcrdma_buffer_create(struct rpcrdma_buffer *buf, struct rpcrdma_ep *ep, r = (struct rpcrdma_mw *)p; switch (ia->ri_memreg_strategy) { case RPCRDMA_FRMR: - for (i = buf->rb_max_requests * RPCRDMA_MAX_SEGS; i; i--) { + for (i = (buf->rb_max_requests+1) * RPCRDMA_MAX_SEGS; i; i--) { r->r.frmr.fr_mr = ib_alloc_fast_reg_mr(ia->ri_pd, ia->ri_max_frmr_depth); if (IS_ERR(r->r.frmr.fr_mr)) { @@ -1252,12 +1255,36 @@ rpcrdma_buffer_destroy(struct rpcrdma_buffer *buf) } static void -rpcrdma_put_mw_locked(struct rpcrdma_mw *mw) +rpcrdma_free_mw(struct kref *kref) { + struct rpcrdma_mw *mw = container_of(kref, struct rpcrdma_mw, mw_ref); list_add_tail(&mw->mw_list, &mw->mw_pool->rb_mws); } static void +rpcrdma_put_mw_locked(struct rpcrdma_mw *mw) +{ + kref_put(&mw->mw_ref, rpcrdma_free_mw); +} + +static void +rpcrdma_get_mw(struct rpcrdma_mw *mw) +{ + kref_get(&mw->mw_ref); +} + +static void +rpcrdma_put_mw(struct rpcrdma_mw *mw) +{ + struct rpcrdma_buffer *buffers = mw->mw_pool; + unsigned long flags; + + spin_lock_irqsave(&buffers->rb_lock, flags); + rpcrdma_put_mw_locked(mw); + spin_unlock_irqrestore(&buffers->rb_lock, flags); +} + +static void rpcrdma_buffer_put_mw(struct rpcrdma_mw **mw) { rpcrdma_put_mw_locked(*mw); @@ -1304,6 +1331,7 @@ rpcrdma_buffer_get_mws(struct rpcrdma_req *req, struct rpcrdma_buffer *buffers) r = list_entry(buffers->rb_mws.next, struct rpcrdma_mw, mw_list); list_del(&r->mw_list); + kref_init(&r->mw_ref); r->mw_pool = buffers; req->rl_segments[i].mr_chunk.rl_mw = r; } @@ -1583,6 +1611,7 @@ rpcrdma_register_frmr_external(struct rpcrdma_mr_seg *seg, dprintk("RPC: %s: Using frmr %p to map %d segments\n", __func__, seg1->mr_chunk.rl_mw, i); + rpcrdma_get_mw(seg1->mr_chunk.rl_mw); if (unlikely(seg1->mr_chunk.rl_mw->r.frmr.fr_state == FRMR_IS_VALID)) { dprintk("RPC: %s: frmr %x left valid, posting invalidate.\n", __func__, @@ -1595,6 +1624,7 @@ rpcrdma_register_frmr_external(struct rpcrdma_mr_seg *seg, invalidate_wr.send_flags = IB_SEND_SIGNALED; invalidate_wr.ex.invalidate_rkey = seg1->mr_chunk.rl_mw->r.frmr.fr_mr->rkey; + rpcrdma_get_mw(seg1->mr_chunk.rl_mw); DECR_CQCOUNT(&r_xprt->rx_ep); post_wr = &invalidate_wr; } else @@ -1638,6 +1668,9 @@ rpcrdma_register_frmr_external(struct rpcrdma_mr_seg *seg, *nsegs = i; return 0; out_err: + rpcrdma_put_mw(seg1->mr_chunk.rl_mw); + if (post_wr == &invalidate_wr) + rpcrdma_put_mw(seg1->mr_chunk.rl_mw); while (i--) rpcrdma_unmap_one(ia, --seg); return rc; @@ -1653,6 +1686,7 @@ rpcrdma_deregister_frmr_external(struct rpcrdma_mr_seg *seg, while (seg1->mr_nsegs--) rpcrdma_unmap_one(ia, seg++); + rpcrdma_get_mw(seg1->mr_chunk.rl_mw); memset(&invalidate_wr, 0, sizeof invalidate_wr); invalidate_wr.wr_id = (unsigned long)(void *)seg1->mr_chunk.rl_mw; @@ -1664,9 +1698,11 @@ rpcrdma_deregister_frmr_external(struct rpcrdma_mr_seg *seg, read_lock(&ia->ri_qplock); rc = ib_post_send(ia->ri_id->qp, &invalidate_wr, &bad_wr); read_unlock(&ia->ri_qplock); - if (rc) + if (rc) { + rpcrdma_put_mw(seg1->mr_chunk.rl_mw); dprintk("RPC: %s: failed ib_post_send for invalidate," " status %i\n", __func__, rc); + } return rc; } diff --git a/net/sunrpc/xprtrdma/xprt_rdma.h b/net/sunrpc/xprtrdma/xprt_rdma.h index b81e5b5..7a140fe 100644 --- a/net/sunrpc/xprtrdma/xprt_rdma.h +++ b/net/sunrpc/xprtrdma/xprt_rdma.h @@ -44,6 +44,7 @@ #include <linux/spinlock.h> /* spinlock_t, etc */ #include <linux/atomic.h> /* atomic_t, etc */ #include <linux/workqueue.h> /* struct work_struct */ +#include <linux/kref.h> #include <rdma/rdma_cm.h> /* RDMA connection api */ #include <rdma/ib_verbs.h> /* RDMA verbs api */ @@ -176,6 +177,7 @@ struct rpcrdma_mw { } r; struct list_head mw_list; struct rpcrdma_buffer *mw_pool; + struct kref mw_ref; }; #define RPCRDMA_BIT_FASTREG (0)
FRMR uses a LOCAL_INV Work Request, which is asynchronous, to deregister segment buffers. Other registration strategies use synchronous deregistration mechanisms (like ib_unmap_fmr()). For a synchronous deregistration mechanism, it makes sense for xprt_rdma_free() to put segment buffers back into the buffer pool immediately once rpcrdma_deregister_external() returns. This is currently also what FRMR is doing. It is releasing segment buffers just after the LOCAL_INV WR is posted. But segment buffers need to be put back after the LOCAL_INV WR _completes_ (or flushes). Otherwise, rpcrdma_buffer_get() can then assign these segment buffers to another RPC task while they are still "in use" by the hardware. The result of re-using an FRMR too quickly is that it's rkey no longer matches the rkey that was registered with the provider. This results in FAST_REG_MR or LOCAL_INV Work Requests completing with IB_WC_MW_BIND_ERR, and the FRMR, and thus the transport, becomes unusable. Signed-off-by: Chuck Lever <chuck.lever@oracle.com> --- net/sunrpc/xprtrdma/verbs.c | 44 +++++++++++++++++++++++++++++++++++---- net/sunrpc/xprtrdma/xprt_rdma.h | 2 ++ 2 files changed, 42 insertions(+), 4 deletions(-) -- To unsubscribe from this list: send the line "unsubscribe linux-rdma" in the body of a message to majordomo@vger.kernel.org More majordomo info at http://vger.kernel.org/majordomo-info.html